Skip to content

Use Minicluster for WAL tests. #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 22, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ private[streaming] object HdfsUtils {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that

val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (dfs.isFile(dfsPath)) {
Expand All @@ -46,27 +43,26 @@ private[streaming] object HdfsUtils {

def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = getFileSystemForPath(dfsPath, conf)
val instream = dfs.open(dfsPath)
instream
}

def checkState(state: Boolean, errorMsg: => String) {
if(!state) {
if (!state) {
throw new IllegalStateException(errorMsg)
}
}

def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = getFileSystemForPath(dfsPath, conf)
val fileStatus = dfs.getFileStatus(dfsPath)
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
blockLocs.map(_.flatMap(_.getHosts))
}

def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
path.getFileSystem(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,18 @@ private[streaming] class WriteAheadLogManager(
Utils.newDaemonFixedThreadPool(1, threadpoolName))
override protected val logName = s"WriteAheadLogManager $callerNameTag"

private var currentLogPath: String = null
private var currentLogPath: Option[String] = None
private var currentLogWriter: WriteAheadLogWriter = null
private var currentLogWriterStartTime: Long = -1L
private var currentLogWriterStopTime: Long = -1L

initializeOrRecover()

/** Write a byte buffer to the log file */
/**
* Write a byte buffer to the log file. This method synchronously writes the data in the
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
* to HDFS, and will be available for readers to read.
*/
def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
var fileSegment: FileSegment = null
var failures = 0
Expand Down Expand Up @@ -99,13 +103,13 @@ private[streaming] class WriteAheadLogManager(
* Read all the existing logs from the log directory.
*
* Note that this is typically called when the caller is initializing and wants
* to recover past state from the write ahead logs (that is, before making any writes).
* to recover past state from the write ahead logs (that is, before making any writes).
* If this is called after writes have been made using this manager, then it may not return
* the latest the records. This does not deal with currently active log files, and
* hence the implementation is kept simple.
*/
def readFromLog(): Iterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
logFilesToRead.iterator.map { file =>
logDebug(s"Creating log reader with $file")
Expand All @@ -130,7 +134,7 @@ private[streaming] class WriteAheadLogManager(
oldLogFiles.foreach { logInfo =>
try {
val path = new Path(logInfo.path)
val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
fs.delete(path, true)
synchronized { pastLogs -= logInfo }
logDebug(s"Cleared log file $logInfo")
Expand Down Expand Up @@ -159,34 +163,30 @@ private[streaming] class WriteAheadLogManager(
private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
resetWriter()
if (currentLogPath != null) {
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
currentLogPath.foreach {
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
}
currentLogWriterStartTime = currentTime
currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
val newLogPath = new Path(logDirectory,
timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
currentLogPath = newLogPath.toString
currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
currentLogPath = Some(newLogPath.toString)
currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf)
}
currentLogWriter
}

/** Initialize the log directory or recover existing logs inside the directory */
private def initializeOrRecover(): Unit = synchronized {
val logDirectoryPath = new Path(logDirectory)
val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space


if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
} else {
fileSystem.mkdirs(logDirectoryPath,
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
logInfo(s"Created ${logDirectory} for write ahead log files")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
close()
false
case e: Exception =>
logDebug("Error reading next item, EOF reached", e)
logWarning("Error while trying to read data from HDFS.", e)
close()
throw e
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,8 @@ import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
*/
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
extends Closeable {
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
val uri = new URI(path)
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
assert(!new File(uri.getPath).exists)
Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
} else {
Right(HdfsUtils.getOutputStream(path, hadoopConf))
}
}
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)

private lazy val hadoopFlushMethod = {
val cls = classOf[FSDataOutputStream]
Expand Down Expand Up @@ -77,21 +67,14 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
stream.close()
}

private def stream(): DataOutputStream = {
underlyingStream.fold(x => x, x => x)
}

private def getPosition(): Long = {
underlyingStream match {
case Left(localStream) => localStream.size
case Right(dfsStream) => dfsStream.getPos()
}
stream.getPos()
}

private def flush() {
underlyingStream match {
case Left(localStream) => localStream.flush
case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) }
hadoopFlushMethod.foreach {
_.invoke(stream)
}
}

Expand Down
Loading