Skip to content

[STREAMING] SPARK-1581: Allow One Flume Avro RPC Server for Each Worker ... #495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.streaming.dstream._
private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
host: Option[String],
port: Int,
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
Expand Down Expand Up @@ -130,7 +130,7 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
* Flume Avro interface. */
private[streaming]
class FlumeReceiver(
host: String,
host: Option[String],
port: Int,
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent] {
Expand All @@ -140,7 +140,7 @@ class FlumeReceiver(
protected override def onStart() {
val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
val server = new NettyServer(responder, new InetSocketAddress(host, port))
val server = new NettyServer(responder, new InetSocketAddress(host.getOrElse("0.0.0.0"), port))
blockGenerator.start()
server.start()
logInfo("Flume receiver started")
Expand All @@ -151,5 +151,5 @@ class FlumeReceiver(
logInfo("Flume receiver stopped")
}

override def getLocationPreference = Some(host)
override def getLocationPreference = host
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,38 @@ import org.apache.spark.streaming.dstream.DStream

object FlumeUtils {
/**
* Create a input stream from a Flume source.
* Create a input stream from a Flume source by starting an Avro RPC server on each worker.
* @param ssc StreamingContext object
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param port Port of the slave machines to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
def createStream (
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
storageLevel: StorageLevel
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, None, port, storageLevel)
inputStream
}

/**
* Create a input stream from a Flume source.
* @param ssc StreamingContext object
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
def createStream (
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
ssc,
Some(hostname),
port,
storageLevel)
inputStream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.dstream.DStream

class FlumeStreamSuite extends TestSuiteBase {

val testPort = 9999

test("flume input stream") {
def testStream(flumeStream: DStream[SparkFlumeEvent], port: Int) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val ssc = flumeStream.ssc
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
Expand All @@ -49,7 +48,7 @@ class FlumeStreamSuite extends TestSuiteBase {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", port))
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)

Expand All @@ -71,16 +70,33 @@ class FlumeStreamSuite extends TestSuiteBase {
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
transceiver.close()
ssc.stop()

val decoder = Charset.forName("UTF-8").newDecoder()

println(outputBuffer)
assert(outputBuffer.size === input.length)
for (i <- 0 until outputBuffer.size) {
println(outputBuffer(i))
assert(outputBuffer(i).size === 1)
val str = decoder.decode(outputBuffer(i).head.event.getBody)
assert(str.toString === input(i).toString)
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}

test("flume input stream") {
val ssc = new StreamingContext(conf, batchDuration)
val port = 9901
val flumeStream = FlumeUtils.createStream(ssc, "localhost", port, StorageLevel.MEMORY_AND_DISK)
testStream(flumeStream, port)
}

test("flume multi-worker input stream") {
val ssc = new StreamingContext(conf, batchDuration)
val port = 9902
val multiWorkerFlumeListeningStream = FlumeUtils.createStream(ssc, port, StorageLevel.MEMORY_AND_DISK)
testStream(multiWorkerFlumeListeningStream, port)
}
}