Skip to content

Commit c86d615

Browse files
[SPARK-3054][STREAMING] Add unit tests for Spark Sink.
This patch adds unit tests for Spark Sink. It also removes the private[flume] for Spark Sink, since the sink is instantiated from Flume configuration (looks like this is ignored by reflection which is used by Flume, but we should still remove it anyway).
1 parent eaeb0f7 commit c86d615

File tree

4 files changed

+227
-2
lines changed

4 files changed

+227
-2
lines changed

external/flume-sink/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@
7272
<groupId>org.scalatest</groupId>
7373
<artifactId>scalatest_${scala.binary.version}</artifactId>
7474
</dependency>
75+
<dependency>
76+
<groupId>org.apache.spark</groupId>
77+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
78+
<version>${project.version}</version>
79+
<type>test-jar</type>
80+
<scope>test</scope> <!-- Need it only for tests, don't package it -->
81+
</dependency>
7582
</dependencies>
7683
<build>
7784
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ import org.apache.flume.sink.AbstractSink
5353
*
5454
*/
5555

56-
private[flume]
5756
class SparkSink extends AbstractSink with Logging with Configurable {
5857

5958
// Size of the pool to use for holding transaction processors.
@@ -131,6 +130,14 @@ class SparkSink extends AbstractSink with Logging with Configurable {
131130
blockingLatch.await()
132131
Status.BACKOFF
133132
}
133+
134+
private[flume] def getPort(): Int = {
135+
serverOpt
136+
.map(_.getPort)
137+
.getOrElse(
138+
throw new RuntimeException("Server was not started!")
139+
)
140+
}
134141
}
135142

136143
/**
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
package org.apache.spark.streaming.flume.sink
2+
3+
import java.net.InetSocketAddress
4+
import java.util.concurrent.atomic.AtomicInteger
5+
import java.util.concurrent.{CountDownLatch, Executors}
6+
7+
import scala.collection.JavaConversions._
8+
import scala.concurrent.{Promise, Future}
9+
import scala.util.{Failure, Success, Try}
10+
11+
import com.google.common.util.concurrent.ThreadFactoryBuilder
12+
import org.apache.avro.ipc.NettyTransceiver
13+
import org.apache.avro.ipc.specific.SpecificRequestor
14+
import org.apache.flume.Context
15+
import org.apache.flume.channel.MemoryChannel
16+
import org.apache.flume.event.EventBuilder
17+
import org.apache.spark.streaming.TestSuiteBase
18+
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
19+
20+
21+
/*
22+
* Licensed to the Apache Software Foundation (ASF) under one or more
23+
* contributor license agreements. See the NOTICE file distributed with
24+
* this work for additional information regarding copyright ownership.
25+
* The ASF licenses this file to You under the Apache License, Version 2.0
26+
* (the "License"); you may not use this file except in compliance with
27+
* the License. You may obtain a copy of the License at
28+
*
29+
* http://www.apache.org/licenses/LICENSE-2.0
30+
*
31+
* Unless required by applicable law or agreed to in writing, software
32+
* distributed under the License is distributed on an "AS IS" BASIS,
33+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
34+
* See the License for the specific language governing permissions and
35+
* limitations under the License.
36+
*/
37+
class SparkSinkSuite extends TestSuiteBase {
38+
val batchCount = 5
39+
val eventsPerBatch = 100
40+
val totalEventsPerChannel = batchCount * eventsPerBatch
41+
val channelCapacity = 5000
42+
val maxAttempts = 5
43+
44+
test("Success test") {
45+
val (channel, sink) = initializeChannelAndSink(None)
46+
channel.start()
47+
sink.start()
48+
49+
putEvents(channel, 1000)
50+
51+
val port = sink.getPort
52+
val address = new InetSocketAddress("0.0.0.0", port)
53+
54+
val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
55+
val events = client.getEventBatch(1000)
56+
client.ack(events.getSequenceNumber)
57+
assert(events.getEvents.size() === 1000)
58+
assertChannelIsEmpty(channel)
59+
sink.stop()
60+
channel.stop()
61+
transceiver.close()
62+
}
63+
64+
test("Nack") {
65+
val (channel, sink) = initializeChannelAndSink(None)
66+
channel.start()
67+
sink.start()
68+
putEvents(channel, 1000)
69+
70+
val port = sink.getPort
71+
val address = new InetSocketAddress("0.0.0.0", port)
72+
73+
val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
74+
val events = client.getEventBatch(1000)
75+
assert(events.getEvents.size() === 1000)
76+
client.nack(events.getSequenceNumber)
77+
assert(availableChannelSlots(channel) === 4000)
78+
sink.stop()
79+
channel.stop()
80+
transceiver.close()
81+
}
82+
83+
test("Timeout") {
84+
val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
85+
.CONF_TRANSACTION_TIMEOUT -> 1.toString)))
86+
channel.start()
87+
sink.start()
88+
putEvents(channel, 1000)
89+
val port = sink.getPort
90+
val address = new InetSocketAddress("0.0.0.0", port)
91+
92+
val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
93+
val events = client.getEventBatch(1000)
94+
assert(events.getEvents.size() === 1000)
95+
Thread.sleep(1000)
96+
assert(availableChannelSlots(channel) === 4000)
97+
sink.stop()
98+
channel.stop()
99+
transceiver.close()
100+
}
101+
102+
test("Multiple consumers") {
103+
multipleClients(failSome = false)
104+
}
105+
106+
test("Multiple consumers With Some Failures") {
107+
multipleClients(failSome = true)
108+
}
109+
110+
def multipleClients(failSome: Boolean): Unit = {
111+
import scala.concurrent.ExecutionContext.Implicits.global
112+
val (channel, sink) = initializeChannelAndSink(None)
113+
channel.start()
114+
sink.start()
115+
(1 to 5).map(_ =>putEvents(channel, 1000))
116+
val port = sink.getPort
117+
val address = new InetSocketAddress("0.0.0.0", port)
118+
119+
val transAndClient = getTransceiverAndClient(address, 5)
120+
val batchCounter = new CountDownLatch(5)
121+
val counter = new AtomicInteger(0)
122+
transAndClient.foreach(x => {
123+
val promise = Promise[EventBatch]()
124+
val future = promise.future
125+
Future {
126+
val client = x._2
127+
var events: EventBatch = null
128+
Try {
129+
events = client.getEventBatch(1000)
130+
if(!failSome || counter.incrementAndGet() % 2 == 0) {
131+
client.ack(events.getSequenceNumber)
132+
} else {
133+
client.nack(events.getSequenceNumber)
134+
}
135+
}.map(_ => promise.success(events)).recover({
136+
case e => promise.failure(e)
137+
})
138+
}
139+
future.onComplete {
140+
case Success(events) => assert(events.getEvents.size() === 1000)
141+
batchCounter.countDown()
142+
case Failure(t) =>
143+
batchCounter.countDown()
144+
throw t
145+
}
146+
})
147+
batchCounter.await()
148+
if(failSome) {
149+
assert(availableChannelSlots(channel) === 2000)
150+
} else {
151+
assertChannelIsEmpty(channel)
152+
}
153+
sink.stop()
154+
channel.stop()
155+
transAndClient.foreach(x => x._1.close())
156+
}
157+
158+
def initializeChannelAndSink(overrides: Option[Map[String, String]]):
159+
(MemoryChannel, SparkSink) = {
160+
val channel = new MemoryChannel()
161+
val channelContext = new Context()
162+
163+
channelContext.put("capacity", channelCapacity.toString)
164+
channelContext.put("transactionCapacity", 1000.toString)
165+
channelContext.put("keep-alive", 0.toString)
166+
overrides.foreach(channelContext.putAll(_))
167+
channel.configure(channelContext)
168+
169+
val sink = new SparkSink()
170+
val sinkContext = new Context()
171+
sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
172+
sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
173+
sink.setChannel(channel)
174+
(channel, sink)
175+
}
176+
177+
private def putEvents(ch: MemoryChannel, count: Int): Unit = {
178+
val tx = ch.getTransaction
179+
tx.begin()
180+
(1 to count).map(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
181+
tx.commit()
182+
tx.close()
183+
}
184+
185+
private def getTransceiverAndClient(address: InetSocketAddress, count: Int):
186+
Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
187+
188+
(1 to count).map(_ => {
189+
lazy val channelFactoryExecutor =
190+
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
191+
setNameFormat("Flume Receiver Channel Thread - %d").build())
192+
lazy val channelFactory =
193+
new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
194+
val transceiver = new NettyTransceiver(address, channelFactory)
195+
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
196+
(transceiver, client)
197+
})
198+
}
199+
200+
private def assertChannelIsEmpty(channel: MemoryChannel) = {
201+
assert(availableChannelSlots(channel) === 5000)
202+
}
203+
204+
private def availableChannelSlots(channel: MemoryChannel): Int = {
205+
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
206+
queueRemaining.setAccessible(true)
207+
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
208+
m.invoke(queueRemaining.get(channel)).asInstanceOf[Int]
209+
}
210+
211+
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
201201
}
202202

203203
def assertChannelIsEmpty(channel: MemoryChannel) = {
204-
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining");
204+
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
205205
queueRemaining.setAccessible(true)
206206
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
207207
assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)

0 commit comments

Comments
 (0)