Skip to content

Commit 172358d

Browse files
committed
Pulled WriteAheadLog-related stuff from tdas/spark/tree/driver-ha-working
1 parent 342b57d commit 172358d

File tree

7 files changed

+770
-0
lines changed

7 files changed

+770
-0
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.storage
18+
19+
private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.storage
18+
19+
import org.apache.hadoop.conf.Configuration
20+
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
21+
22+
private[streaming] object HdfsUtils {
23+
24+
def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
25+
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that
26+
27+
val dfsPath = new Path(path)
28+
val dfs =
29+
this.synchronized {
30+
dfsPath.getFileSystem(conf)
31+
}
32+
// If the file exists and we have append support, append instead of creating a new file
33+
val stream: FSDataOutputStream = {
34+
if (dfs.isFile(dfsPath)) {
35+
if (conf.getBoolean("hdfs.append.support", false)) {
36+
dfs.append(dfsPath)
37+
} else {
38+
throw new IllegalStateException("File exists and there is no append support!")
39+
}
40+
} else {
41+
dfs.create(dfsPath)
42+
}
43+
}
44+
stream
45+
}
46+
47+
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
48+
val dfsPath = new Path(path)
49+
val dfs = this.synchronized {
50+
dfsPath.getFileSystem(conf)
51+
}
52+
val instream = dfs.open(dfsPath)
53+
instream
54+
}
55+
56+
def checkState(state: Boolean, errorMsg: => String) {
57+
if(!state) {
58+
throw new IllegalStateException(errorMsg)
59+
}
60+
}
61+
62+
def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
63+
val dfsPath = new Path(path)
64+
val dfs =
65+
this.synchronized {
66+
dfsPath.getFileSystem(conf)
67+
}
68+
val fileStatus = dfs.getFileStatus(dfsPath)
69+
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
70+
blockLocs.map(_.flatMap(_.getHosts))
71+
}
72+
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package org.apache.spark.streaming.storage
2+
3+
import java.nio.ByteBuffer
4+
5+
import scala.collection.mutable.ArrayBuffer
6+
import scala.concurrent.{ExecutionContext, Future}
7+
8+
import org.apache.hadoop.conf.Configuration
9+
import org.apache.hadoop.fs.Path
10+
import org.apache.hadoop.fs.permission.FsPermission
11+
import org.apache.spark.Logging
12+
import org.apache.spark.streaming.storage.WriteAheadLogManager._
13+
import org.apache.spark.streaming.util.{Clock, SystemClock}
14+
import org.apache.spark.util.Utils
15+
16+
private[streaming] class WriteAheadLogManager(
17+
logDirectory: String,
18+
hadoopConf: Configuration,
19+
rollingIntervalSecs: Int = 60,
20+
maxFailures: Int = 3,
21+
callerName: String = "",
22+
clock: Clock = new SystemClock
23+
) extends Logging {
24+
25+
private val pastLogs = new ArrayBuffer[LogInfo]
26+
private val callerNameTag =
27+
if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
28+
private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
29+
implicit private val executionContext = ExecutionContext.fromExecutorService(
30+
Utils.newDaemonFixedThreadPool(1, threadpoolName))
31+
override protected val logName = s"WriteAheadLogManager $callerNameTag"
32+
33+
private var currentLogPath: String = null
34+
private var currentLogWriter: WriteAheadLogWriter = null
35+
private var currentLogWriterStartTime: Long = -1L
36+
private var currentLogWriterStopTime: Long = -1L
37+
38+
initializeOrRecover()
39+
40+
def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
41+
var fileSegment: FileSegment = null
42+
var failures = 0
43+
var lastException: Exception = null
44+
var succeeded = false
45+
while (!succeeded && failures < maxFailures) {
46+
try {
47+
fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
48+
succeeded = true
49+
} catch {
50+
case ex: Exception =>
51+
lastException = ex
52+
logWarning("Failed to ...")
53+
resetWriter()
54+
failures += 1
55+
}
56+
}
57+
if (fileSegment == null) {
58+
throw lastException
59+
}
60+
fileSegment
61+
}
62+
63+
def readFromLog(): Iterator[ByteBuffer] = synchronized {
64+
val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
65+
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
66+
logFilesToRead.iterator.map { file =>
67+
logDebug(s"Creating log reader with $file")
68+
new WriteAheadLogReader(file, hadoopConf)
69+
} flatMap { x => x }
70+
}
71+
72+
/**
73+
* Delete the log files that are older than the threshold time.
74+
*
75+
* Its important to note that the threshold time is based on the time stamps used in the log
76+
* files, and is therefore based on the local system time. So if there is coordination necessary
77+
* between the node calculating the threshTime (say, driver node), and the local system time
78+
* (say, worker node), the caller has to take account of possible time skew.
79+
*/
80+
def cleanupOldLogs(threshTime: Long): Unit = {
81+
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
82+
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
83+
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
84+
85+
def deleteFiles() {
86+
oldLogFiles.foreach { logInfo =>
87+
try {
88+
val path = new Path(logInfo.path)
89+
val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
90+
fs.delete(path, true)
91+
synchronized { pastLogs -= logInfo }
92+
logDebug(s"Cleared log file $logInfo")
93+
} catch {
94+
case ex: Exception =>
95+
logWarning(s"Error clearing log file $logInfo", ex)
96+
}
97+
}
98+
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
99+
}
100+
if (!executionContext.isShutdown) {
101+
Future { deleteFiles() }
102+
}
103+
}
104+
105+
def stop(): Unit = synchronized {
106+
if (currentLogWriter != null) {
107+
currentLogWriter.close()
108+
}
109+
executionContext.shutdown()
110+
logInfo("Stopped log manager")
111+
}
112+
113+
private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
114+
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
115+
resetWriter()
116+
if (currentLogPath != null) {
117+
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
118+
}
119+
currentLogWriterStartTime = currentTime
120+
currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
121+
val newLogPath = new Path(logDirectory,
122+
timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
123+
currentLogPath = newLogPath.toString
124+
currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
125+
}
126+
currentLogWriter
127+
}
128+
129+
private def initializeOrRecover(): Unit = synchronized {
130+
val logDirectoryPath = new Path(logDirectory)
131+
val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
132+
133+
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
134+
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
135+
pastLogs.clear()
136+
pastLogs ++= logFileInfo
137+
logInfo(s"Recovered ${logFileInfo.size} log files from $logDirectory")
138+
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
139+
} else {
140+
fileSystem.mkdirs(logDirectoryPath,
141+
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
142+
logInfo(s"Created ${logDirectory} for log files")
143+
}
144+
}
145+
146+
private def resetWriter(): Unit = synchronized {
147+
if (currentLogWriter != null) {
148+
currentLogWriter.close()
149+
currentLogWriter = null
150+
}
151+
}
152+
}
153+
154+
private[storage] object WriteAheadLogManager {
155+
156+
case class LogInfo(startTime: Long, endTime: Long, path: String)
157+
158+
val logFileRegex = """log-(\d+)-(\d+)""".r
159+
160+
def timeToLogFile(startTime: Long, stopTime: Long): String = {
161+
s"log-$startTime-$stopTime"
162+
}
163+
164+
def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
165+
files.flatMap { file =>
166+
logFileRegex.findFirstIn(file.getName()) match {
167+
case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
168+
val startTime = startTimeStr.toLong
169+
val stopTime = stopTimeStr.toLong
170+
Some(LogInfo(startTime, stopTime, file.toString))
171+
case None =>
172+
None
173+
}
174+
}.sortBy { _.startTime }
175+
}
176+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.storage
18+
19+
import java.io.Closeable
20+
import java.nio.ByteBuffer
21+
22+
import org.apache.hadoop.conf.Configuration
23+
24+
private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
25+
extends Closeable {
26+
27+
private val instream = HdfsUtils.getInputStream(path, conf)
28+
private var closed = false
29+
30+
def read(segment: FileSegment): ByteBuffer = synchronized {
31+
assertOpen()
32+
instream.seek(segment.offset)
33+
val nextLength = instream.readInt()
34+
HdfsUtils.checkState(nextLength == segment.length,
35+
"Expected message length to be " + segment.length + ", " + "but was " + nextLength)
36+
val buffer = new Array[Byte](nextLength)
37+
instream.readFully(buffer)
38+
ByteBuffer.wrap(buffer)
39+
}
40+
41+
override def close(): Unit = synchronized {
42+
closed = true
43+
instream.close()
44+
}
45+
46+
private def assertOpen() {
47+
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
48+
}
49+
}
50+
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.storage
18+
19+
import java.io.{EOFException, Closeable}
20+
import java.nio.ByteBuffer
21+
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.spark.Logging
24+
25+
private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
26+
extends Iterator[ByteBuffer] with Closeable with Logging {
27+
28+
private val instream = HdfsUtils.getInputStream(path, conf)
29+
private var closed = false
30+
private var nextItem: Option[ByteBuffer] = None
31+
32+
override def hasNext: Boolean = synchronized {
33+
if (closed) {
34+
return false
35+
}
36+
37+
if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
38+
true
39+
} else {
40+
try {
41+
val length = instream.readInt()
42+
val buffer = new Array[Byte](length)
43+
instream.readFully(buffer)
44+
nextItem = Some(ByteBuffer.wrap(buffer))
45+
logTrace("Read next item " + nextItem.get)
46+
true
47+
} catch {
48+
case e: EOFException =>
49+
logDebug("Error reading next item, EOF reached", e)
50+
close()
51+
false
52+
case e: Exception =>
53+
logDebug("Error reading next item, EOF reached", e)
54+
close()
55+
throw e
56+
}
57+
}
58+
}
59+
60+
override def next(): ByteBuffer = synchronized {
61+
val data = nextItem.getOrElse {
62+
close()
63+
throw new IllegalStateException(
64+
"next called without calling hasNext or after hasNext returned false")
65+
}
66+
nextItem = None // Ensure the next hasNext call loads new data.
67+
data
68+
}
69+
70+
override def close(): Unit = synchronized {
71+
if (!closed) {
72+
instream.close()
73+
}
74+
closed = true
75+
}
76+
}

0 commit comments

Comments
 (0)