Skip to content

[SPARK-11198][STREAMING][KINESIS] Support de-aggregation of records during recovery #9403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions extras/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@
<artifactId>aws-java-sdk</artifactId>
<version>${aws.java.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>${aws.kinesis.producer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.util.control.NonFatal

import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
import com.amazonaws.services.kinesis.model._

import org.apache.spark._
Expand Down Expand Up @@ -210,7 +211,10 @@ class KinesisSequenceRangeIterator(
s"getting records using shard iterator") {
client.getRecords(getRecordsRequest)
}
(getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator)
// De-aggregate records, if KPL was used in producing the records. The KCL automatically
// handles de-aggregation during regular operation. This code path is used during recovery
val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
(recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ private[kinesis] class KinesisReceiver[T](
val metadata = SequenceNumberRange(streamName, shardId,
records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[kinesis] class KinesisRecordProcessor[T](
* more than once.
*/
logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)

/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
throw e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.{SparkConf, SparkContext, SparkException}

class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll {
abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
extends KinesisFunSuite with BeforeAndAfterAll {

private val testData = 1 to 8

Expand All @@ -37,13 +38,12 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
private var sc: SparkContext = null
private var blockManager: BlockManager = null


override def beforeAll(): Unit = {
runIfTestsEnabled("Prepare KinesisTestUtils") {
testUtils = new KinesisTestUtils()
testUtils.createStream()

shardIdToDataAndSeqNumbers = testUtils.pushData(testData)
shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData)
require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")

shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
Expand Down Expand Up @@ -247,3 +247,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
Array.tabulate(num) { i => new StreamBlockId(0, i) }
}
}

class WithAggregationKinesisBackedBlockRDDSuite
extends KinesisBackedBlockRDDTests(aggregateTestData = true)

class WithoutAggregationKinesisBackedBlockRDDSuite
extends KinesisBackedBlockRDDTests(aggregateTestData = false)
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkContext}

class KinesisStreamSuite extends KinesisFunSuite
abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite
with Eventually with BeforeAndAfter with BeforeAndAfterAll {

// This is the name that KCL will use to save metadata to DynamoDB
Expand Down Expand Up @@ -182,13 +182,13 @@ class KinesisStreamSuite extends KinesisFunSuite
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
logInfo("Collected = " + collected.mkString(", "))
}
ssc.start()

val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData)
testUtils.pushData(testData, aggregateTestData)
assert(collected === testData.toSet, "\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
Expand All @@ -207,13 +207,13 @@ class KinesisStreamSuite extends KinesisFunSuite
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
logInfo("Collected = " + collected.mkString(", "))
}
ssc.start()

val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData)
testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5)
assert(collected === modData.toSet, "\nData received does not match data sent")
}
Expand Down Expand Up @@ -254,7 +254,7 @@ class KinesisStreamSuite extends KinesisFunSuite
// If this times out because numBatchesWithData is empty, then its likely that foreachRDD
// function failed with exceptions, and nothing got added to `collectedData`
eventually(timeout(2 minutes), interval(1 seconds)) {
testUtils.pushData(1 to 5)
testUtils.pushData(1 to 5, aggregateTestData)
assert(isCheckpointPresent && numBatchesWithData > 10)
}
ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused
Expand Down Expand Up @@ -285,5 +285,8 @@ class KinesisStreamSuite extends KinesisFunSuite
}
ssc.stop()
}

}

class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true)

class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model._
import com.amazonaws.services.kinesis.producer.{KinesisProducer, KinesisProducerConfiguration, UserRecordResult}
import com.google.common.util.concurrent.{FutureCallback, Futures}

import org.apache.spark.Logging

Expand Down Expand Up @@ -64,6 +66,16 @@ private[kinesis] class KinesisTestUtils extends Logging {
new DynamoDB(dynamoDBClient)
}

private lazy val kinesisProducer: KinesisProducer = {
val conf = new KinesisProducerConfiguration()
.setRecordMaxBufferedTime(1000)
.setMaxConnections(1)
.setRegion(regionName)
.setMetricsLevel("none")

new KinesisProducer(conf)
}

def streamName: String = {
require(streamCreated, "Stream not yet created, call createStream() to create one")
_streamName
Expand All @@ -90,22 +102,41 @@ private[kinesis] class KinesisTestUtils extends Logging {
* Push data to Kinesis stream and return a map of
* shardId -> seq of (data, seq number) pushed to corresponding shard
*/
def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = {
def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
require(streamCreated, "Stream not yet created, call createStream() to create one")
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()

testData.foreach { num =>
val str = num.toString
val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
.withData(ByteBuffer.wrap(str.getBytes()))
.withPartitionKey(str)

val putRecordResult = kinesisClient.putRecord(putRecordRequest)
val shardId = putRecordResult.getShardId
val seqNumber = putRecordResult.getSequenceNumber()
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
new ArrayBuffer[(Int, String)]())
sentSeqNumbers += ((num, seqNumber))
val data = ByteBuffer.wrap(str.getBytes())
if (aggregate) {
val future = kinesisProducer.addUserRecord(streamName, str, data)
val kinesisCallBack = new FutureCallback[UserRecordResult]() {
override def onFailure(t: Throwable): Unit = {} // do nothing

override def onSuccess(result: UserRecordResult): Unit = {
val shardId = result.getShardId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant this code be deduped with lines 134 to 138

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though they have the same methods, they are of different types. UserRecordResult is not a subclass of PutRecordResult

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah, nvm then.

val seqNumber = result.getSequenceNumber()
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
new ArrayBuffer[(Int, String)]())
sentSeqNumbers += ((num, seqNumber))
}
}

Futures.addCallback(future, kinesisCallBack)
kinesisProducer.flushSync() // make sure we send all data before returning the map
} else {
val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
.withData(data)
.withPartitionKey(str)

val putRecordResult = kinesisClient.putRecord(putRecordRequest)
val shardId = putRecordResult.getShardId
val seqNumber = putRecordResult.getSequenceNumber()
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
new ArrayBuffer[(Int, String)]())
sentSeqNumbers += ((num, seqNumber))
}
}

logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
Expand All @@ -116,7 +147,7 @@ private[kinesis] class KinesisTestUtils extends Logging {
* Expose a Python friendly API.
*/
def pushData(testData: java.util.List[Int]): Unit = {
pushData(testData.asScala)
pushData(testData.asScala, aggregate = false)
}

def deleteStream(): Unit = {
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@
<jets3t.version>0.7.1</jets3t.version>
<aws.java.sdk.version>1.9.40</aws.java.sdk.version>
<aws.kinesis.client.version>1.4.0</aws.kinesis.client.version>
<!-- the producer is used in tests -->
<aws.kinesis.producer.version>0.10.1</aws.kinesis.producer.version>
<!-- org.apache.httpcomponents/httpclient-->
<commons.httpclient.version>4.3.2</commons.httpclient.version>
<!-- commons-httpclient/commons-httpclient-->
Expand Down