From ddb739113b38d4e2f4617a105cd0ab071f21d4de Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Mon, 7 Oct 2024 15:53:39 -0400 Subject: [PATCH] docs: Add ingestion from GCS sample (#2211) * docs: Add ingestion from GCS Java sample * docs: Add ingestion from GCS sample * docs: Add test for GCS ingestion * docs: Move topicName declaration to satisfy style check * docs: Update bucket for Cloud Storage ingestion sample test --- ...TopicWithCloudStorageIngestionExample.java | 109 ++++++++++++++++++ .../src/test/java/pubsub/AdminIT.java | 48 ++++++-- 2 files changed, 150 insertions(+), 7 deletions(-) create mode 100644 samples/snippets/src/main/java/pubsub/CreateTopicWithCloudStorageIngestionExample.java diff --git a/samples/snippets/src/main/java/pubsub/CreateTopicWithCloudStorageIngestionExample.java b/samples/snippets/src/main/java/pubsub/CreateTopicWithCloudStorageIngestionExample.java new file mode 100644 index 000000000..d795cdc28 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CreateTopicWithCloudStorageIngestionExample.java @@ -0,0 +1,109 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_create_topic_with_cloud_storage_ingestion] + +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.protobuf.util.Timestamps; +import com.google.pubsub.v1.IngestionDataSourceSettings; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.text.ParseException; + +public class CreateTopicWithCloudStorageIngestionExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String topicId = "your-topic-id"; + // Cloud Storage ingestion settings. + // bucket and inputFormat are required arguments. + String bucket = "your-bucket"; + String inputFormat = "text"; + String textDelimiter = "\n"; + String matchGlob = "**.txt"; + String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ"; + + createTopicWithCloudStorageIngestionExample( + projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime); + } + + public static void createTopicWithCloudStorageIngestionExample( + String projectId, + String topicId, + String bucket, + String inputFormat, + String textDelimiter, + String matchGlob, + String minimumObjectCreateTime) + throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder = + IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket); + switch (inputFormat) { + case "text": + cloudStorageBuilder.setTextFormat( + IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder() + .setDelimiter(textDelimiter) + .build()); + break; + case "avro": + cloudStorageBuilder.setAvroFormat( + IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance()); + break; + case "pubsub_avro": + cloudStorageBuilder.setPubsubAvroFormat( + IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance()); + break; + default: + throw new IllegalArgumentException( + "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat); + } + + if (matchGlob != null && !matchGlob.isEmpty()) { + cloudStorageBuilder.setMatchGlob(matchGlob); + } + + if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) { + try { + cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime)); + } catch (ParseException e) { + System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime); + } + } + + IngestionDataSourceSettings ingestionDataSourceSettings = + IngestionDataSourceSettings.newBuilder() + .setCloudStorage(cloudStorageBuilder.build()) + .build(); + + TopicName topicName = TopicName.of(projectId, topicId); + + Topic topic = + topicAdminClient.createTopic( + Topic.newBuilder() + .setName(topicName.toString()) + .setIngestionDataSourceSettings(ingestionDataSourceSettings) + .build()); + + System.out.println( + "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields()); + } + } +} +// [END pubsub_create_topic_with_cloud_storage_ingestion] diff --git a/samples/snippets/src/test/java/pubsub/AdminIT.java b/samples/snippets/src/test/java/pubsub/AdminIT.java index a2eb7ebfd..fcfa74981 100644 --- a/samples/snippets/src/test/java/pubsub/AdminIT.java +++ b/samples/snippets/src/test/java/pubsub/AdminIT.java @@ -52,7 +52,9 @@ public class AdminIT { private static final String projectId = System.getenv("GOOGLE_CLOUD_PROJECT"); private static final String _suffix = UUID.randomUUID().toString(); private static final String topicId = "iam-topic-" + _suffix; - private static final String ingestionTopicId = "ingestion-topic-" + _suffix; + private static final String kinesisIngestionTopicId = "kinesis-ingestion-topic-" + _suffix; + private static final String cloudStorageIngestionTopicId = + "cloud-storage-ingestion-topic-" + _suffix; private static final String pullSubscriptionId = "iam-pull-subscription-" + _suffix; private static final String pushSubscriptionId = "iam-push-subscription-" + _suffix; private static final String orderedSubscriptionId = "iam-ordered-subscription-" + _suffix; @@ -75,9 +77,18 @@ public class AdminIT { private static final String awsRoleArn = "arn:aws:iam::111111111111:role/fake-role-name"; private static final String gcpServiceAccount = "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"; + private static final String cloudStorageBucket = "pubsub-cloud-storage-bucket"; + private static final String cloudStorageInputFormat = "text"; + private static final String cloudStorageTextDelimiter = ","; + private static final String cloudStorageMatchGlob = "**.txt"; + private static final String cloudStorageMinimumObjectCreateTime = "1970-01-01T00:00:00Z"; + private static final String cloudStorageMinimumObjectCreateTimeSeconds = "0"; private static final TopicName topicName = TopicName.of(projectId, topicId); - private static final TopicName ingestionTopicName = TopicName.of(projectId, ingestionTopicId); + private static final TopicName kinesisIngestionTopicName = + TopicName.of(projectId, kinesisIngestionTopicId); + private static final TopicName cloudStorageIngestionTopicName = + TopicName.of(projectId, cloudStorageIngestionTopicId); private static final SubscriptionName pullSubscriptionName = SubscriptionName.of(projectId, pullSubscriptionId); private static final SubscriptionName pushSubscriptionName = @@ -304,9 +315,9 @@ public void testAdmin() throws Exception { bout.reset(); // Test create topic with Kinesis ingestion settings. CreateTopicWithKinesisIngestionExample.createTopicWithKinesisIngestionExample( - projectId, ingestionTopicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount); + projectId, kinesisIngestionTopicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount); assertThat(bout.toString()) - .contains("google.pubsub.v1.Topic.name=" + ingestionTopicName.toString()); + .contains("google.pubsub.v1.Topic.name=" + kinesisIngestionTopicName.toString()); assertThat(bout.toString()).contains(streamArn); assertThat(bout.toString()).contains(consumerArn); assertThat(bout.toString()).contains(awsRoleArn); @@ -315,9 +326,9 @@ public void testAdmin() throws Exception { bout.reset(); // Test update existing Kinesis ingestion settings. UpdateTopicTypeExample.updateTopicTypeExample( - projectId, ingestionTopicId, streamArn, consumerArn2, awsRoleArn, gcpServiceAccount); + projectId, kinesisIngestionTopicId, streamArn, consumerArn2, awsRoleArn, gcpServiceAccount); assertThat(bout.toString()) - .contains("google.pubsub.v1.Topic.name=" + ingestionTopicName.toString()); + .contains("google.pubsub.v1.Topic.name=" + kinesisIngestionTopicName.toString()); assertThat(bout.toString()).contains(streamArn); assertThat(bout.toString()).contains(consumerArn2); assertThat(bout.toString()).contains(awsRoleArn); @@ -325,7 +336,30 @@ public void testAdmin() throws Exception { bout.reset(); // Test delete Kinesis ingestion topic. - DeleteTopicExample.deleteTopicExample(projectId, ingestionTopicId); + DeleteTopicExample.deleteTopicExample(projectId, kinesisIngestionTopicId); + assertThat(bout.toString()).contains("Deleted topic."); + + bout.reset(); + // Test create topic with Cloud Storage ingestion settings. + CreateTopicWithCloudStorageIngestionExample.createTopicWithCloudStorageIngestionExample( + projectId, + cloudStorageIngestionTopicId, + cloudStorageBucket, + cloudStorageInputFormat, + cloudStorageTextDelimiter, + cloudStorageMatchGlob, + cloudStorageMinimumObjectCreateTime); + assertThat(bout.toString()) + .contains("google.pubsub.v1.Topic.name=" + cloudStorageIngestionTopicName.toString()); + assertThat(bout.toString()).contains(cloudStorageBucket); + assertThat(bout.toString()).contains(cloudStorageInputFormat); + assertThat(bout.toString()).contains(cloudStorageTextDelimiter); + assertThat(bout.toString()).contains(cloudStorageMatchGlob); + assertThat(bout.toString()).contains(cloudStorageMinimumObjectCreateTimeSeconds); + + bout.reset(); + // Test delete Cloud Storage ingestion topic. + DeleteTopicExample.deleteTopicExample(projectId, cloudStorageIngestionTopicId); assertThat(bout.toString()).contains("Deleted topic."); } }