Skip to content

[SPARK-14028][STREAMING][KINESIS][TESTS] Remove deprecated methods; fix two other warnings #11850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,51 +221,6 @@ object KinesisUtils {
}
}

/**
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
* Note:
*
* - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
* on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
* gets AWS credentials.
* - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
* - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name
* in [[org.apache.spark.SparkConf]].
*
* @param ssc StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param storageLevel Storage level to use for storing the received objects
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
@deprecated("use other forms of createStream", "1.4.0")
def createStream(
ssc: StreamingContext,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]] = {
ssc.withNamedScope("kinesis stream") {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName,
checkpointInterval, storageLevel, defaultMessageHandler, None)
}
}

/**
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
Expand Down Expand Up @@ -453,47 +408,6 @@ object KinesisUtils {
defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
}

/**
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
* Note:
* - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
* on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
* gets AWS credentials.
* - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
* - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
* [[org.apache.spark.SparkConf]].
*
* @param jssc Java StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param storageLevel Storage level to use for storing the received objects
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
@deprecated("use other forms of createStream", "1.4.0")
def createStream(
jssc: JavaStreamingContext,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): JavaReceiverInputDStream[Array[Byte]] = {
createStream(
jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
}

private def getRegionByEndpoint(endpointUrl: String): String = {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.streaming.kinesis;

import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesis.model.Record;
import org.junit.Test;

Expand All @@ -34,11 +35,13 @@
public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKinesisStream() {
// Tests the API, does not actually test data receiving
JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
"https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
String dummyEndpointUrl = KinesisTestUtils.defaultEndpointUrl();
String dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This JavaDStream is created by the deprecated & removed createStream method; so let's switch to another createStream method.

// Tests the API, does not actually test data receiving
JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2());
ssc.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite {
if (shouldRunTests) {
body
} else {
ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the (), scalac will add this () implicitly and then complain that this is probably not we want. So let's add this () explicitly to erase this warning.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,10 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
}

test("KinesisUtils API") {
// Tests the API, does not actually test data receiving
val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
dummyEndpointUrl, Seconds(2),
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kinesisStream1 tests the deprecated & removed method; let's just remove it.

dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
dummyAWSAccessKey, dummyAWSSecretKey)
Expand Down Expand Up @@ -154,7 +150,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun

// Verify that KinesisBackedBlockRDD is generated even when there are no blocks
val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
// Verify it's KinesisBackedBlockRDD[_] rather than KinesisBackedBlockRDD[Array[Byte]], because
// the type parameter will be erased at runtime
emptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KinesisBackedBlockRDD[Array[Byte]] will cause a warning because the type parameter will be erased at runtime anyway; so let's switch to KinesisBackedBlockRDD[_].

emptyRDD.partitions shouldBe empty

// Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
Expand Down