Skip to content

Shutdown spark context after tests. Formatting/minor fixes #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 27, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader}
import org.apache.spark._

private[streaming]
class HDFSBackedBlockRDDPartition(
val blockId: BlockId, idx: Int, val segment: WriteAheadLogFileSegment) extends Partition {
val index = idx
}
val blockId: BlockId,
val index: Int,
val segment: WriteAheadLogFileSegment
) extends Partition

private[streaming]
class HDFSBackedBlockRDD[T: ClassTag](
Expand All @@ -42,13 +42,12 @@ class HDFSBackedBlockRDD[T: ClassTag](
val storageLevel: StorageLevel
) extends BlockRDD[T](sc, blockIds) {

if (blockIds.length != segments.length) {
throw new IllegalStateException("Number of block ids must be the same as number of segments!")
}
require(blockIds.length == segments.length,
"Number of block ids must be the same as number of segments!")

// Hadoop Configuration is not serializable, so broadcast it as a serializable.
val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]]

override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.size).map { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ package org.apache.spark.streaming.rdd
import java.io.File
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration

import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
import org.apache.spark.{SparkConf, SparkContext}

class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
Expand All @@ -51,6 +52,13 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
dir.delete()
}

override def afterAll(): Unit = {
// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not
// get imported properly by sbt even if it is created.
sparkContext.stop()
System.clearProperty("spark.driver.port")
}

test("Data available in BM and HDFS") {
doTestHDFSBackedRDD(5, 5, 20, 5)
}
Expand All @@ -70,8 +78,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
/**
* Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the
* BlockManager, so all reads need not happen from HDFS.
* @param total - Total number of Strings to write
* @param blockCount - Number of blocks to write (therefore, total # of events per block =
* @param total Total number of Strings to write
* @param blockCount Number of blocks to write (therefore, total # of events per block =
* total/blockCount
*/
private def doTestHDFSBackedRDD(
Expand All @@ -81,8 +89,7 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
blockCount: Int
) {
val countPerBlock = total / blockCount
val blockIds = (0 until blockCount).map {
i =>
val blockIds = (0 until blockCount).map { i =>
StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet())
}

Expand All @@ -95,16 +102,17 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
}
}

val segments = new ArrayBuffer[WriteAheadLogFileSegment]
if (writeToHDFSCount != 0) {
// Generate some fake segments for the blocks in BM so the RDD does not complain
segments ++= generateFakeSegments(writeToBMCount)
segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
blockIds.slice(writeToBMCount, blockCount))

} else {
segments ++= generateFakeSegments(blockCount)
val segments = {
if (writeToHDFSCount != 0) {
// Generate some fake segments for the blocks in BM so the RDD does not complain
generateFakeSegments(writeToBMCount) ++
writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
blockIds.slice(writeToBMCount, blockCount))
} else {
generateFakeSegments(blockCount)
}
}

val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
segments.toArray, false, StorageLevel.MEMORY_ONLY)

Expand All @@ -116,10 +124,9 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
/**
* Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that
* went into one block.
* @param count - Number of Strings to write
* @param countPerBlock - Number of Strings per block
* @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of WriteAheadLogFileSegments,
* each representing the block being written to HDFS.
* @param count Number of Strings to write
* @param countPerBlock Number of Strings per block
* @return Seq of Seqs, each of these Seqs is one block
*/
private def generateData(
count: Int,
Expand All @@ -130,8 +137,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
}

private def writeDataToHDFS(
blockData: Seq[Seq[String]],
blockIds: Seq[BlockId]
blockData: Seq[Seq[String]],
blockIds: Seq[BlockId]
): Seq[WriteAheadLogFileSegment] = {
assert(blockData.size === blockIds.size)
val segments = new ArrayBuffer[WriteAheadLogFileSegment]()
Expand Down