Skip to content

Commit 5182ffb

Browse files
harishreedharantdas
authored andcommitted
Added documentation
1 parent 172358d commit 5182ffb

File tree

5 files changed

+86
-18
lines changed

5 files changed

+86
-18
lines changed

streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,24 @@ import org.apache.spark.streaming.storage.WriteAheadLogManager._
1313
import org.apache.spark.streaming.util.{Clock, SystemClock}
1414
import org.apache.spark.util.Utils
1515

16+
/**
17+
* This class manages write ahead log files.
18+
* - Writes records (bytebuffers) to periodically rotating log files.
19+
* - Recovers the log files and the reads the recovered records upon failures.
20+
* - Cleans up old log files.
21+
*
22+
* Uses [[org.apache.spark.streaming.storage.WriteAheadLogWriter]] to write
23+
* and [[org.apache.spark.streaming.storage.WriteAheadLogReader]] to read.
24+
*
25+
*@param logDirectory Directory when rotating log files will be created.
26+
* @param hadoopConf Hadoop configuration for reading/writing log files.
27+
* @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
28+
* Default is one minute.
29+
* @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
30+
* Default is three.
31+
* @param callerName Optional name of the class who is using this manager.
32+
* @param clock Optional clock that is used to check for rotation interval.
33+
*/
1634
private[streaming] class WriteAheadLogManager(
1735
logDirectory: String,
1836
hadoopConf: Configuration,
@@ -37,6 +55,7 @@ private[streaming] class WriteAheadLogManager(
3755

3856
initializeOrRecover()
3957

58+
/** Write a byte buffer to the log file */
4059
def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
4160
var fileSegment: FileSegment = null
4261
var failures = 0
@@ -49,17 +68,27 @@ private[streaming] class WriteAheadLogManager(
4968
} catch {
5069
case ex: Exception =>
5170
lastException = ex
52-
logWarning("Failed to ...")
71+
logWarning("Failed to write to write ahead log")
5372
resetWriter()
5473
failures += 1
5574
}
5675
}
5776
if (fileSegment == null) {
77+
logError(s"Failed to write to write ahead log after $failures failures")
5878
throw lastException
5979
}
6080
fileSegment
6181
}
6282

83+
/**
84+
* Read all the existing logs from the log directory.
85+
*
86+
* Note that this is typically called when the caller is initializing and wants
87+
* to recover past state from the write ahead logs (that is, before making any writes).
88+
* If this is called after writes have been made using this manager, then it may not return
89+
* the latest the records. This does not deal with currently active log files, and
90+
* hence the implementation is kept simple.
91+
*/
6392
def readFromLog(): Iterator[ByteBuffer] = synchronized {
6493
val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
6594
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
@@ -73,7 +102,7 @@ private[streaming] class WriteAheadLogManager(
73102
* Delete the log files that are older than the threshold time.
74103
*
75104
* Its important to note that the threshold time is based on the time stamps used in the log
76-
* files, and is therefore based on the local system time. So if there is coordination necessary
105+
* files, which is usually based on the local system time. So if there is coordination necessary
77106
* between the node calculating the threshTime (say, driver node), and the local system time
78107
* (say, worker node), the caller has to take account of possible time skew.
79108
*/
@@ -92,7 +121,7 @@ private[streaming] class WriteAheadLogManager(
92121
logDebug(s"Cleared log file $logInfo")
93122
} catch {
94123
case ex: Exception =>
95-
logWarning(s"Error clearing log file $logInfo", ex)
124+
logWarning(s"Error clearing write ahead log file $logInfo", ex)
96125
}
97126
}
98127
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
@@ -102,14 +131,16 @@ private[streaming] class WriteAheadLogManager(
102131
}
103132
}
104133

134+
/** Stop the manager, close any open log writer */
105135
def stop(): Unit = synchronized {
106136
if (currentLogWriter != null) {
107137
currentLogWriter.close()
108138
}
109139
executionContext.shutdown()
110-
logInfo("Stopped log manager")
140+
logInfo("Stopped write ahead log manager")
111141
}
112142

143+
/** Get the current log writer while taking care of rotation */
113144
private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
114145
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
115146
resetWriter()
@@ -126,6 +157,7 @@ private[streaming] class WriteAheadLogManager(
126157
currentLogWriter
127158
}
128159

160+
/** Initialize the log directory or recover existing logs inside the directory */
129161
private def initializeOrRecover(): Unit = synchronized {
130162
val logDirectoryPath = new Path(logDirectory)
131163
val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
@@ -134,12 +166,12 @@ private[streaming] class WriteAheadLogManager(
134166
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
135167
pastLogs.clear()
136168
pastLogs ++= logFileInfo
137-
logInfo(s"Recovered ${logFileInfo.size} log files from $logDirectory")
169+
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
138170
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
139171
} else {
140172
fileSystem.mkdirs(logDirectoryPath,
141173
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
142-
logInfo(s"Created ${logDirectory} for log files")
174+
logInfo(s"Created ${logDirectory} for write ahead log files")
143175
}
144176
}
145177

streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ import java.nio.ByteBuffer
2121

2222
import org.apache.hadoop.conf.Configuration
2323

24+
/**
25+
* A random access reader for reading write ahead log files written using
26+
* [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. Given the file segment info,
27+
* this reads the record (bytebuffer) from the log file.
28+
*/
2429
private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
2530
extends Closeable {
2631

streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ import java.nio.ByteBuffer
2222
import org.apache.hadoop.conf.Configuration
2323
import org.apache.spark.Logging
2424

25+
/**
26+
* A reader for reading write ahead log files written using
27+
* [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. This reads
28+
* the records (bytebuffers) in the log file sequentially and return them as an
29+
* iterator of bytebuffers.
30+
*/
2531
private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
2632
extends Iterator[ByteBuffer] with Closeable with Logging {
2733

streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,21 @@ import org.apache.hadoop.conf.Configuration
2626
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
2727
import org.apache.spark.streaming.storage.FileSegment
2828

29-
private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) extends Closeable {
29+
/**
30+
* A writer for writing byte-buffers to a write ahead log file.
31+
*/
32+
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
33+
extends Closeable {
3034
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
3135
val uri = new URI(path)
32-
val defaultFs = FileSystem.getDefaultUri(conf).getScheme
36+
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
3337
val isDefaultLocal = defaultFs == null || defaultFs == "file"
3438

3539
if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
3640
assert(!new File(uri.getPath).exists)
3741
Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
3842
} else {
39-
Right(HdfsUtils.getOutputStream(path, conf))
43+
Right(HdfsUtils.getOutputStream(path, hadoopConf))
4044
}
4145
}
4246

@@ -49,9 +53,7 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration)
4953
private var closed = false
5054

5155

52-
// Data is always written as:
53-
// - Length - Long
54-
// - Data - of length = Length
56+
/** Write the bytebuffer to the log file */
5557
def write(data: ByteBuffer): FileSegment = synchronized {
5658
assertOpen()
5759
data.rewind() // Rewind to ensure all data in the buffer is retrieved

streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,28 @@
1616
*/
1717
package org.apache.spark.streaming.storage
1818

19-
import java.io.{DataInputStream, FileInputStream, File, RandomAccessFile}
19+
import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
2020
import java.nio.ByteBuffer
2121

22-
import scala.util.Random
23-
2422
import scala.collection.mutable.ArrayBuffer
23+
import scala.concurrent.duration._
2524
import scala.language.implicitConversions
25+
import scala.util.Random
2626

27-
import com.google.common.io.Files
28-
import org.apache.hadoop.conf.Configuration
2927
import org.scalatest.{BeforeAndAfter, FunSuite}
28+
import org.scalatest.concurrent.Eventually._
29+
30+
import com.google.common.io.Files
3031
import org.apache.commons.io.FileUtils
32+
import org.apache.hadoop.conf.Configuration
33+
3134
import org.apache.spark.streaming.util.ManualClock
3235
import org.apache.spark.util.Utils
3336
import WriteAheadLogSuite._
3437

38+
/**
39+
* This testsuite tests all classes related to write ahead logs.
40+
*/
3541
class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
3642

3743
val hadoopConf = new Configuration()
@@ -163,8 +169,25 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
163169
assert(dataToWrite.toList === readData.toList)
164170
}
165171

172+
test("WriteAheadLogManager - cleanup old logs") {
173+
// Write data with manager, recover with new manager and verify
174+
val dataToWrite = generateRandomData(100)
175+
val fakeClock = new ManualClock
176+
val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf,
177+
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
178+
dataToWrite.foreach { item =>
179+
fakeClock.addToTime(500) // half second for each
180+
manager.writeToLog(item)
181+
}
182+
val logFiles = getLogFilesInDirectory(tempDirectory)
183+
assert(logFiles.size > 1)
184+
manager.cleanupOldLogs(fakeClock.currentTime() / 2)
185+
eventually(timeout(1 second), interval(10 milliseconds)) {
186+
assert(getLogFilesInDirectory(tempDirectory).size < logFiles.size)
187+
}
188+
}
189+
166190
// TODO (Hari, TD): Test different failure conditions of writers and readers.
167-
// - Failure in the middle of write
168191
// - Failure while reading incomplete/corrupt file
169192
}
170193

0 commit comments

Comments
 (0)