Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions v2/kinesis-to-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,7 @@

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-amazon-web-services</artifactId>
<version>${beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kinesis</artifactId>
<artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
<version>${beam.version}</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

import com.google.cloud.teleport.metadata.TemplateParameter;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.io.aws.options.AwsOptions;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
import software.amazon.awssdk.regions.Region;

/**
* The {@link KinesisToPubsubOptions} interface provides the custom execution options passed by the
Expand Down Expand Up @@ -56,9 +57,9 @@ public interface KinesisToPubsubOptions
helpText = "AWS Region")
@Validation.Required
@Default.InstanceFactory(AwsRegionFactory.class)
String getAwsRegion();
Region getAwsRegion();

void setAwsRegion(String awsRegion);
void setAwsRegion(Region awsRegion);

@TemplateParameter.Text(
order = 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.google.cloud.teleport.v2.templates;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
Expand All @@ -26,12 +24,16 @@
import javax.annotation.Nonnull;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.kinesis.KinesisIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.kinesis.common.InitialPositionInStream;

/**
* The {@link KinesisToPubsub} is a streaming pipeline which reads data from AWS Kinesis Data stream
Expand Down Expand Up @@ -125,7 +127,8 @@

// Create the pipeline
Pipeline pipeline = Pipeline.create(options);

AwsBasicCredentials creds = AwsBasicCredentials.create(awsKeyId, awsSecretAccessKey);
StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds);

Check warning on line 131 in v2/kinesis-to-pubsub/src/main/java/com/google/cloud/teleport/v2/templates/KinesisToPubsub.java

View check run for this annotation

Codecov / codecov/patch

v2/kinesis-to-pubsub/src/main/java/com/google/cloud/teleport/v2/templates/KinesisToPubsub.java#L130-L131

Added lines #L130 - L131 were not covered by tests
/*
* Steps:
* 1) Read messages in from Kinesis
Expand All @@ -137,8 +140,11 @@
"Read Kinesis Datastream",
KinesisIO.read()
.withStreamName(kinesisDatastreamName)
.withAWSClientsProvider(
awsKeyId, awsSecretAccessKey, Regions.fromName(options.getAwsRegion()))
.withClientConfiguration(
ClientConfiguration.builder()
.credentialsProvider(provider)
.region(options.getAwsRegion())
.build())

Check warning on line 147 in v2/kinesis-to-pubsub/src/main/java/com/google/cloud/teleport/v2/templates/KinesisToPubsub.java

View check run for this annotation

Codecov / codecov/patch

v2/kinesis-to-pubsub/src/main/java/com/google/cloud/teleport/v2/templates/KinesisToPubsub.java#L143-L147

Added lines #L143 - L147 were not covered by tests
.withInitialPositionInStream(InitialPositionInStream.LATEST))
.apply("Extract String message", ParDo.of(new KinesisDataTransforms.ExtractStringFn()))
.apply("PubsubSink", PubsubIO.writeStrings().to(options.getOutputPubsubTopic()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.teleport.v2.transforms;

import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.io.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Loading