Skip to content

Commit 73d6f6d

Browse files
Cleaned up tests a bit. Added some docs in multiple places.
1 parent 65b76b4 commit 73d6f6d

File tree

4 files changed

+48
-19
lines changed

4 files changed

+48
-19
lines changed

external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
3131
* @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
3232
* is rolled back.
3333
*/
34+
// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
35+
// rolled back from the thread it was originally created in. So each getEvents call from Spark
36+
// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
37+
// and events are pulled off the channel. Once the events are sent to spark,
38+
// that thread is blocked and the TransactionProcessor is saved in a map,
39+
// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
40+
// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then
41+
// unblocked, at which point the transaction is committed or rolled back.
42+
3443
private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
3544
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
3645
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,

external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,23 @@ import org.apache.flume.sink.AbstractSink
3636
* if an ACK is not received from Spark within that time
3737
* threads - Number of threads to use to receive requests from Spark (Default: 10)
3838
*
39+
* This sink is unlike other Flume sinks in the sense that it does not push data,
40+
* instead the process method in this sink simply blocks the SinkRunner the first time it is
41+
* called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
42+
*
43+
* Each time a getEventBatch call comes, creates a transaction and reads events
44+
* from the channel. When enough events are read, the events are sent to the Spark receiver and
45+
* the thread itself is blocked and a reference to it saved off.
46+
*
47+
* When the ack for that batch is received,
48+
* the thread which created the transaction is is retrieved and it commits the transaction with the
49+
* channel from the same thread it was originally created in (since Flume transactions are
50+
* thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
51+
* is received within the specified timeout, the transaction is rolled back too. If an ack comes
52+
* after that, it is simply ignored and the events get re-sent.
53+
*
3954
*/
40-
// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
41-
// rolled back from the thread it was originally created in. So each getEvents call from Spark
42-
// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
43-
// and events are pulled off the channel. Once the events are sent to spark,
44-
// that thread is blocked and the TransactionProcessor is saved in a map,
45-
// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
46-
// When the response comes, the TransactionProcessor is retrieved and then unblocked,
47-
// at which point the transaction is committed or rolled back.
55+
4856
private[flume]
4957
class SparkSink extends AbstractSink with Logging with Configurable {
5058

external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,13 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
213213
charSeqMap
214214
}
215215

216+
/**
217+
* When the thread is started it sets as many events as the batch size or less (if enough
218+
* events aren't available) into the eventBatch and object and lets any threads waiting on the
219+
* [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
220+
* or for a specified timeout and commits or rolls back the transaction.
221+
* @return
222+
*/
216223
override def call(): Void = {
217224
populateEvents()
218225
processAckOrNack()

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,24 +35,28 @@ import org.apache.spark.streaming.util.ManualClock
3535
import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
3636
import org.apache.spark.streaming.flume.sink._
3737

38-
class FlumePollingStreamSuite extends TestSuiteBase {
38+
class FlumePollingStreamSuite extends TestSuiteBase {
3939

4040
val testPort = 9999
41+
val batchCount = 5
42+
val eventsPerBatch = 100
43+
val totalEventsPerChannel = batchCount * eventsPerBatch
44+
val channelCapacity = 5000
4145

4246
test("flume polling test") {
4347
// Set up the streaming context and input streams
4448
val ssc = new StreamingContext(conf, batchDuration)
4549
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
4650
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
47-
StorageLevel.MEMORY_AND_DISK, 100, 1)
51+
StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
4852
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
4953
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
5054
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
5155
outputStream.register()
5256

5357
// Start the channel and sink.
5458
val context = new Context()
55-
context.put("capacity", "5000")
59+
context.put("capacity", channelCapacity.toString)
5660
context.put("transactionCapacity", "1000")
5761
context.put("keep-alive", "0")
5862
val channel = new MemoryChannel()
@@ -77,15 +81,16 @@ import org.apache.spark.streaming.flume.sink._
7781
val ssc = new StreamingContext(conf, batchDuration)
7882
val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
7983
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
80-
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, 100, 5)
84+
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
85+
eventsPerBatch, 5)
8186
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
8287
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
8388
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
8489
outputStream.register()
8590

8691
// Start the channel and sink.
8792
val context = new Context()
88-
context.put("capacity", "5000")
93+
context.put("capacity", channelCapacity.toString)
8994
context.put("transactionCapacity", "1000")
9095
context.put("keep-alive", "0")
9196
val channel = new MemoryChannel()
@@ -127,7 +132,7 @@ import org.apache.spark.streaming.flume.sink._
127132
executorCompletion.take()
128133
}
129134
val startTime = System.currentTimeMillis()
130-
while (outputBuffer.size < 5 &&
135+
while (outputBuffer.size < batchCount * channels.size &&
131136
System.currentTimeMillis() - startTime < 15000) {
132137
logInfo("output.size = " + outputBuffer.size)
133138
Thread.sleep(100)
@@ -138,9 +143,9 @@ import org.apache.spark.streaming.flume.sink._
138143
ssc.stop()
139144

140145
val flattenedBuffer = outputBuffer.flatten
141-
assert(flattenedBuffer.size === 25 * channels.size)
146+
assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
142147
var counter = 0
143-
for (k <- 0 until channels.size; i <- 0 until 25) {
148+
for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
144149
val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
145150
String.valueOf(i)).getBytes("utf-8"),
146151
Map[String, String]("test-" + i.toString -> "header"))
@@ -157,7 +162,7 @@ import org.apache.spark.streaming.flume.sink._
157162
j += 1
158163
}
159164
}
160-
assert(counter === 25 * channels.size)
165+
assert(counter === totalEventsPerChannel * channels.size)
161166
}
162167

163168
def assertChannelIsEmpty(channel: MemoryChannel) = {
@@ -170,10 +175,10 @@ import org.apache.spark.streaming.flume.sink._
170175
private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
171176
override def call(): Void = {
172177
var t = 0
173-
for (i <- 0 until 5) {
178+
for (i <- 0 until batchCount) {
174179
val tx = channel.getTransaction
175180
tx.begin()
176-
for (j <- 0 until 5) {
181+
for (j <- 0 until eventsPerBatch) {
177182
channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
178183
"utf-8"),
179184
Map[String, String]("test-" + t.toString -> "header")))

0 commit comments

Comments
 (0)