Skip to content

Commit 56cdbd6

Browse files
BryanCutlersrowen
authored andcommitted
[SPARK-9844][CORE] File appender race condition during shutdown
When an Executor process is destroyed, the FileAppender that is asynchronously reading the stderr stream of the process can throw an IOException during read because the stream is closed. Before the ExecutorRunner destroys the process, the FileAppender thread is flagged to stop. This PR wraps the inputStream.read call of the FileAppender in a try/catch block so that if an IOException is thrown and the thread has been flagged to stop, it will safely ignore the exception. Additionally, the FileAppender thread was changed to use Utils.tryWithSafeFinally to better log any exception that do occur. Added unit tests to verify a IOException is thrown and logged if FileAppender is not flagged to stop, and that no IOException when the flag is set. Author: Bryan Cutler <cutlerb@gmail.com> Closes apache#10714 from BryanCutler/file-appender-read-ioexception-SPARK-9844.
1 parent 8f13cd4 commit 56cdbd6

File tree

2 files changed

+95
-10
lines changed

2 files changed

+95
-10
lines changed

core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.util.logging
1919

20-
import java.io.{File, FileOutputStream, InputStream}
20+
import java.io.{File, FileOutputStream, InputStream, IOException}
2121

2222
import org.apache.spark.{Logging, SparkConf}
2323
import org.apache.spark.util.{IntParam, Utils}
@@ -58,20 +58,28 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
5858
protected def appendStreamToFile() {
5959
try {
6060
logDebug("Started appending thread")
61-
openFile()
62-
val buf = new Array[Byte](bufferSize)
63-
var n = 0
64-
while (!markedForStop && n != -1) {
65-
n = inputStream.read(buf)
66-
if (n != -1) {
67-
appendToFile(buf, n)
61+
Utils.tryWithSafeFinally {
62+
openFile()
63+
val buf = new Array[Byte](bufferSize)
64+
var n = 0
65+
while (!markedForStop && n != -1) {
66+
try {
67+
n = inputStream.read(buf)
68+
} catch {
69+
// An InputStream can throw IOException during read if the stream is closed
70+
// asynchronously, so once appender has been flagged to stop these will be ignored
71+
case _: IOException if markedForStop => // do nothing and proceed to stop appending
72+
}
73+
if (n > 0) {
74+
appendToFile(buf, n)
75+
}
6876
}
77+
} {
78+
closeFile()
6979
}
7080
} catch {
7181
case e: Exception =>
7282
logError(s"Error writing stream to file $file", e)
73-
} finally {
74-
closeFile()
7583
}
7684
}
7785

core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818
package org.apache.spark.util
1919

2020
import java.io._
21+
import java.util.concurrent.CountDownLatch
2122

2223
import scala.collection.mutable.HashSet
2324
import scala.reflect._
2425

2526
import com.google.common.base.Charsets.UTF_8
2627
import com.google.common.io.Files
28+
import org.apache.log4j.{Appender, Level, Logger}
29+
import org.apache.log4j.spi.LoggingEvent
30+
import org.mockito.ArgumentCaptor
31+
import org.mockito.Mockito.{atLeast, mock, verify}
2732
import org.scalatest.BeforeAndAfter
2833

2934
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
@@ -188,6 +193,67 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
188193
testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
189194
}
190195

196+
test("file appender async close stream abruptly") {
197+
// Test FileAppender reaction to closing InputStream using a mock logging appender
198+
val mockAppender = mock(classOf[Appender])
199+
val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
200+
201+
// Make sure only logging errors
202+
val logger = Logger.getRootLogger
203+
logger.setLevel(Level.ERROR)
204+
logger.addAppender(mockAppender)
205+
206+
val testOutputStream = new PipedOutputStream()
207+
val testInputStream = new PipedInputStream(testOutputStream)
208+
209+
// Close the stream before appender tries to read will cause an IOException
210+
testInputStream.close()
211+
testOutputStream.close()
212+
val appender = FileAppender(testInputStream, testFile, new SparkConf)
213+
214+
appender.awaitTermination()
215+
216+
// If InputStream was closed without first stopping the appender, an exception will be logged
217+
verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
218+
val loggingEvent = loggingEventCaptor.getValue
219+
assert(loggingEvent.getThrowableInformation !== null)
220+
assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
221+
}
222+
223+
test("file appender async close stream gracefully") {
224+
// Test FileAppender reaction to closing InputStream using a mock logging appender
225+
val mockAppender = mock(classOf[Appender])
226+
val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
227+
228+
// Make sure only logging errors
229+
val logger = Logger.getRootLogger
230+
logger.setLevel(Level.ERROR)
231+
logger.addAppender(mockAppender)
232+
233+
val testOutputStream = new PipedOutputStream()
234+
val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream
235+
236+
// Close the stream before appender tries to read will cause an IOException
237+
testInputStream.close()
238+
testOutputStream.close()
239+
val appender = FileAppender(testInputStream, testFile, new SparkConf)
240+
241+
// Stop the appender before an IOException is called during read
242+
testInputStream.latchReadStarted.await()
243+
appender.stop()
244+
testInputStream.latchReadProceed.countDown()
245+
246+
appender.awaitTermination()
247+
248+
// Make sure no IOException errors have been logged as a result of appender closing gracefully
249+
verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
250+
import scala.collection.JavaConverters._
251+
loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
252+
assert(loggingEvent.getThrowableInformation === null
253+
|| !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
254+
}
255+
}
256+
191257
/**
192258
* Run the rolling file appender with data and see whether all the data was written correctly
193259
* across rolled over files.
@@ -228,4 +294,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
228294
file.getName.startsWith(testFile.getName)
229295
}.foreach { _.delete() }
230296
}
297+
298+
/** Used to synchronize when read is called on a stream */
299+
private trait LatchedInputStream extends PipedInputStream {
300+
val latchReadStarted = new CountDownLatch(1)
301+
val latchReadProceed = new CountDownLatch(1)
302+
abstract override def read(): Int = {
303+
latchReadStarted.countDown()
304+
latchReadProceed.await()
305+
super.read()
306+
}
307+
}
231308
}

0 commit comments

Comments
 (0)