Skip to content

Commit 8bcd6c3

Browse files
committed
add logging for test
1 parent 83a0f13 commit 8bcd6c3

File tree

4 files changed

+11
-5
lines changed

4 files changed

+11
-5
lines changed

extras/kinesis-asl/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@
7070
<version>${aws.kinesis.producer.version}</version>
7171
<scope>test</scope>
7272
</dependency>
73+
<dependency>
74+
<groupId>com.google.protobuf</groupId>
75+
<artifactId>protobuf-java</artifactId>
76+
<version>2.6.1</version>
77+
<scope>test</scope>
78+
</dependency>
7379
<dependency>
7480
<groupId>org.mockito</groupId>
7581
<artifactId>mockito-core</artifactId>

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ private[kinesis] class KinesisRecordProcessor[T](
7070
* in the DStream
7171
*/
7272
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
73+
logInfo(s"Received batch: $batch")
7374
if (!receiver.isStopped()) {
7475
try {
7576
receiver.addRecords(shardId, batch)

extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
7878
if (testUtils != null) {
7979
// Delete the Kinesis stream as well as the DynamoDB table generated by
8080
// Kinesis Client Library when consuming the stream
81-
testUtils.deleteStream()
82-
testUtils.deleteDynamoDBTable(appName)
81+
// testUtils.deleteStream()
82+
// testUtils.deleteDynamoDBTable(appName)
8383
}
8484
}
8585

@@ -182,7 +182,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
182182
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
183183
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
184184
collected ++= rdd.collect()
185-
logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
185+
logInfo("Collected = " + collected.mkString(", "))
186186
}
187187
ssc.start()
188188

@@ -207,7 +207,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
207207
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
208208
stream.foreachRDD { rdd =>
209209
collected ++= rdd.collect()
210-
logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
210+
logInfo("Collected = " + collected.mkString(", "))
211211
}
212212
ssc.start()
213213

extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ private[kinesis] class KinesisTestUtils extends Logging {
139139
}
140140
}
141141

142-
143142
logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
144143
shardIdToSeqNumbers.toMap
145144
}

0 commit comments

Comments
 (0)