Skip to content

Commit 4acf47e

Browse files
committed
changed debug's to infos
1 parent 8bcd6c3 commit 4acf47e

File tree

4 files changed

+7
-6
lines changed

4 files changed

+7
-6
lines changed

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
9999
val blockId = partition.blockId
100100

101101
def getBlockFromBlockManager(): Option[Iterator[T]] = {
102-
logDebug(s"Read partition data of $this from block manager, block $blockId")
102+
logInfo(s"Read partition data of $this from block manager, block $blockId")
103103
blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
104104
}
105105

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
5454
val seqNumRanges = blockInfos.map {
5555
_.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
5656
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
57-
logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
57+
logInfo(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
5858
s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
5959
new KinesisBackedBlockRDD(
6060
context.sc, regionName, endpointUrl, blockIds, seqNumRanges,

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ private[kinesis] class KinesisReceiver[T](
206206
val dataIterator = records.iterator().asScala.map(messageHandler)
207207
val metadata = SequenceNumberRange(streamName, shardId,
208208
records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
209+
logInfo(s"blockGenerator.multipleDataWithCallback: iterator: $dataIterator, meta: $metadata")
209210
blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
210211

211212
}
@@ -231,7 +232,7 @@ private[kinesis] class KinesisReceiver[T](
231232
private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
232233
blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray)
233234
seqNumRangesInCurrentBlock.clear()
234-
logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
235+
logInfo(s"Generated block $blockId has $blockIdToSeqNumRanges")
235236
}
236237

237238
/** Store the block along with its associated ranges */

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ private[kinesis] class KinesisRecordProcessor[T](
7070
* in the DStream
7171
*/
7272
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
73-
logInfo(s"Received batch: $batch")
73+
logInfo(s"Received batch: $batch. Is receiver alive: ${!receiver.isStopped()}")
7474
if (!receiver.isStopped()) {
7575
try {
7676
receiver.addRecords(shardId, batch)
77-
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
77+
logInfo(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
7878

7979
/*
8080
*
@@ -111,7 +111,7 @@ private[kinesis] class KinesisRecordProcessor[T](
111111
* more than once.
112112
*/
113113
logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
114-
" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
114+
s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
115115

116116
/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
117117
throw e

0 commit comments

Comments
 (0)