From a7fd27ccfc79151e27269ebcfb62db4ba3ed03dc Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Thu, 23 Nov 2023 19:20:01 -0800 Subject: [PATCH] [HUDI-7086] Scaling gcs event source (#10073) - Scaling gcs event source --------- Co-authored-by: rmahindra123 --- .../utilities/config/CloudSourceConfig.java | 20 +++- .../utilities/sources/GcsEventsSource.java | 7 +- .../helpers/gcs/PubsubMessagesFetcher.java | 102 ++++++++++------ .../helpers/gcs/PubsubQueueClient.java | 80 +++++++++++++ .../gcs/TestPubsubMessagesFetcher.java | 110 ++++++++++++++++++ 5 files changed, 279 insertions(+), 40 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubQueueClient.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/gcs/TestPubsubMessagesFetcher.java diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index 007d36fc7042..81533d940a8c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -53,7 +53,17 @@ public class CloudSourceConfig extends HoodieConfig { .defaultValue(10) .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.meta.batch.size") .markAdvanced() - .withDocumentation("Number of metadata messages to pull at a time"); + .withDocumentation("Number of metadata messages to pull in one API call to the cloud events queue. " + + "Multiple API calls with this batch size are sent to cloud events queue, until we consume hoodie.streamer.source.cloud.meta.max.num.messages.per.sync" + + "from the queue or hoodie.streamer.source.cloud.meta.max.fetch.time.per.sync.ms amount of time has passed or queue is empty. "); + + public static final ConfigProperty MAX_NUM_MESSAGES_PER_SYNC = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.max.num.messages.per.sync") + .defaultValue(1000) + .markAdvanced() + .sinceVersion("0.14.1") + .withDocumentation("Maximum number of messages to consume per sync round. Multiple rounds of " + + BATCH_SIZE_CONF.key() + " could be invoked to reach max messages as configured by this config"); public static final ConfigProperty ACK_MESSAGES = ConfigProperty .key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.ack") @@ -137,4 +147,12 @@ public class CloudSourceConfig extends HoodieConfig { .sinceVersion("0.14.1") .withDocumentation("specify this value in bytes, to coalesce partitions of source dataset not greater than specified limit"); + public static final ConfigProperty MAX_FETCH_TIME_PER_SYNC_MS = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.max.fetch.time.per.sync.ms") + .defaultValue(1) + .markAdvanced() + .sinceVersion("0.14.1") + .withDocumentation("Max time in millis to consume " + MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event queues like SQS, " + + "PubSub can return empty responses even when messages are available the queue, this config ensures we don't wait forever " + + "to consume MAX_MESSAGES_CONF messages, but time out and move on further."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java index f934f2794989..897771168edf 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java @@ -49,6 +49,8 @@ import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.config.CloudSourceConfig.ACK_MESSAGES; import static org.apache.hudi.utilities.config.CloudSourceConfig.BATCH_SIZE_CONF; +import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_MS; +import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_NUM_MESSAGES_PER_SYNC; import static org.apache.hudi.utilities.config.GCSEventsSourceConfig.GOOGLE_PROJECT_ID; import static org.apache.hudi.utilities.config.GCSEventsSourceConfig.PUBSUB_SUBSCRIPTION_ID; import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.ProcessingDecision.DO_SKIP; @@ -117,8 +119,9 @@ public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, SparkSession new PubsubMessagesFetcher( getStringWithAltKeys(props, GOOGLE_PROJECT_ID), getStringWithAltKeys(props, PUBSUB_SUBSCRIPTION_ID), - getIntWithAltKeys(props, BATCH_SIZE_CONF) - ) + getIntWithAltKeys(props, BATCH_SIZE_CONF), + getIntWithAltKeys(props, MAX_NUM_MESSAGES_PER_SYNC), + getIntWithAltKeys(props, MAX_FETCH_TIME_PER_SYNC_MS)) ); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java index 886b60cce7cc..3b574045d7aa 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java @@ -20,21 +20,25 @@ import org.apache.hudi.exception.HoodieException; -import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; -import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; -import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create; import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE; /** @@ -42,18 +46,31 @@ */ public class PubsubMessagesFetcher { + private static final int DEFAULT_BATCH_SIZE_ACK_API = 10; + private static final long MAX_WAIT_TIME_TO_ACK_MESSAGES = TimeUnit.MINUTES.toMillis(1); + private static final int ACK_PRODUCER_THREAD_POOL_SIZE = 3; + + private final ExecutorService threadPool = Executors.newFixedThreadPool(ACK_PRODUCER_THREAD_POOL_SIZE); private final String googleProjectId; private final String pubsubSubscriptionId; private final int batchSize; + private final int maxMessagesPerSync; + private final long maxFetchTimePerSync; private final SubscriberStubSettings subscriberStubSettings; + private final PubsubQueueClient pubsubQueueClient; private static final Logger LOG = LoggerFactory.getLogger(PubsubMessagesFetcher.class); - public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize) { + public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize, + int maxMessagesPerSync, + long maxFetchTimePerSync, + PubsubQueueClient pubsubQueueClient) { this.googleProjectId = googleProjectId; this.pubsubSubscriptionId = pubsubSubscriptionId; this.batchSize = batchSize; + this.maxMessagesPerSync = maxMessagesPerSync; + this.maxFetchTimePerSync = maxFetchTimePerSync; try { /** For details of timeout and retry configs, @@ -69,49 +86,60 @@ public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId } catch (IOException e) { throw new HoodieException("Error creating subscriber stub settings", e); } + this.pubsubQueueClient = pubsubQueueClient; + } + + public PubsubMessagesFetcher( + String googleProjectId, + String pubsubSubscriptionId, + int batchSize, + int maxMessagesPerSync, + long maxFetchTimePerSync) { + this( + googleProjectId, + pubsubSubscriptionId, + batchSize, + maxMessagesPerSync, + maxFetchTimePerSync, + new PubsubQueueClient() + ); } public List fetchMessages() { - try { - try (SubscriberStub subscriber = createSubscriber()) { - String subscriptionName = getSubscriptionName(); - PullResponse pullResponse = makePullRequest(subscriber, subscriptionName); - return pullResponse.getReceivedMessagesList(); + List messageList = new ArrayList<>(); + try (SubscriberStub subscriber = pubsubQueueClient.getSubscriber(subscriberStubSettings)) { + String subscriptionName = ProjectSubscriptionName.format(googleProjectId, pubsubSubscriptionId); + long startTime = System.currentTimeMillis(); + long unAckedMessages = pubsubQueueClient.getNumUnAckedMessages(this.pubsubSubscriptionId); + LOG.info("Found unacked messages " + unAckedMessages); + while (messageList.size() < unAckedMessages && messageList.size() < maxMessagesPerSync && (System.currentTimeMillis() - startTime < maxFetchTimePerSync)) { + PullResponse pullResponse = pubsubQueueClient.makePullRequest(subscriber, subscriptionName, batchSize); + messageList.addAll(pullResponse.getReceivedMessagesList()); } - } catch (IOException e) { + return messageList; + } catch (Exception e) { throw new HoodieException("Error when fetching metadata", e); } } public void sendAcks(List messagesToAck) throws IOException { - String subscriptionName = getSubscriptionName(); - try (SubscriberStub subscriber = createSubscriber()) { - - AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder() - .setSubscription(subscriptionName) - .addAllAckIds(messagesToAck) - .build(); - - subscriber.acknowledgeCallable().call(acknowledgeRequest); - - LOG.info("Acknowledged messages: " + messagesToAck); + try (SubscriberStub subscriber = pubsubQueueClient.getSubscriber(subscriberStubSettings)) { + int numberOfBatches = (int) Math.ceil((double) messagesToAck.size() / DEFAULT_BATCH_SIZE_ACK_API); + CompletableFuture.allOf(IntStream.range(0, numberOfBatches) + .parallel() + .boxed() + .map(batchIndex -> getTask(subscriber, messagesToAck, batchIndex)).toArray(CompletableFuture[]::new)) + .get(MAX_WAIT_TIME_TO_ACK_MESSAGES, TimeUnit.MILLISECONDS); + LOG.debug("Flushed out all outstanding acknowledged messages: " + messagesToAck.size()); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + throw new IOException("Failed to ack messages from PubSub", e); } } - private PullResponse makePullRequest(SubscriberStub subscriber, String subscriptionName) { - PullRequest pullRequest = PullRequest.newBuilder() - .setMaxMessages(batchSize) - .setSubscription(subscriptionName) - .build(); - - return subscriber.pullCallable().call(pullRequest); - } - - private GrpcSubscriberStub createSubscriber() throws IOException { - return create(subscriberStubSettings); - } - - private String getSubscriptionName() { - return ProjectSubscriptionName.format(googleProjectId, pubsubSubscriptionId); + private CompletableFuture getTask(SubscriberStub subscriber, List messagesToAck, int batchIndex) { + String subscriptionName = ProjectSubscriptionName.format(googleProjectId, pubsubSubscriptionId); + List messages = messagesToAck.subList(batchIndex, Math.min(batchIndex + DEFAULT_BATCH_SIZE_ACK_API, messagesToAck.size())); + return CompletableFuture.runAsync(() -> pubsubQueueClient.makeAckRequest(subscriber, subscriptionName, messages), threadPool); } } + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubQueueClient.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubQueueClient.java new file mode 100644 index 000000000000..7f93d32b6068 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubQueueClient.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import com.google.cloud.ServiceOptions; +import com.google.cloud.monitoring.v3.MetricServiceClient; +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.monitoring.v3.ListTimeSeriesRequest; +import com.google.monitoring.v3.Point; +import com.google.monitoring.v3.ProjectName; +import com.google.monitoring.v3.TimeInterval; +import com.google.protobuf.util.Timestamps; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class PubsubQueueClient { + private static final String METRIC_FILTER_PATTERN = "metric.type=\"pubsub.googleapis.com/subscription/%s\" AND resource.label.subscription_id=\"%s\""; + private static final String NUM_UNDELIVERED_MESSAGES = "num_undelivered_messages"; + + public SubscriberStub getSubscriber(SubscriberStubSettings subscriberStubSettings) throws IOException { + return GrpcSubscriberStub.create(subscriberStubSettings); + } + + public PullResponse makePullRequest(SubscriberStub subscriber, String subscriptionName, int batchSize) throws IOException { + PullRequest pullRequest = PullRequest.newBuilder() + .setMaxMessages(batchSize) + .setSubscription(subscriptionName) + .build(); + return subscriber.pullCallable().call(pullRequest); + } + + public void makeAckRequest(SubscriberStub subscriber, String subscriptionName, List messages) { + AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder() + .setSubscription(subscriptionName) + .addAllAckIds(messages) + .build(); + subscriber.acknowledgeCallable().call(acknowledgeRequest); + } + + public long getNumUnAckedMessages(String subscriptionId) throws IOException { + try (MetricServiceClient metricServiceClient = MetricServiceClient.create()) { + MetricServiceClient.ListTimeSeriesPagedResponse response = metricServiceClient.listTimeSeries( + ListTimeSeriesRequest.newBuilder() + .setName(ProjectName.of(ServiceOptions.getDefaultProjectId()).toString()) + .setFilter(String.format(METRIC_FILTER_PATTERN, NUM_UNDELIVERED_MESSAGES, subscriptionId)) + .setInterval(TimeInterval.newBuilder() + .setStartTime(Timestamps.fromSeconds(Instant.now().getEpochSecond() - TimeUnit.MINUTES.toSeconds(2))) + .setEndTime(Timestamps.fromSeconds(Instant.now().getEpochSecond())) + .build()) + .build()); + // use the latest value from the window + List pointList = response.getPage().getValues().iterator().next().getPointsList(); + return pointList.stream().findFirst().map(point -> point.getValue().getInt64Value()).orElse(Long.MAX_VALUE); + } + } +} \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/gcs/TestPubsubMessagesFetcher.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/gcs/TestPubsubMessagesFetcher.java new file mode 100644 index 000000000000..2122dfa7af45 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/gcs/TestPubsubMessagesFetcher.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestPubsubMessagesFetcher { + private static final String PROJECT_ID = "test-project"; + private static final String SUBSCRIPTION_ID = "test-subscription"; + private static final String SUBSCRIPTION_NAME = ProjectSubscriptionName.format(PROJECT_ID, SUBSCRIPTION_ID); + private static final int SMALL_BATCH_SIZE = 1; + private static final int MAX_MESSAGES_IN_REQUEST = 1000; + private static final long MAX_WAIT_TIME_IN_REQUEST = TimeUnit.SECONDS.toMillis(1); + + private final SubscriberStub mockSubscriber = Mockito.mock(SubscriberStub.class); + private final PubsubQueueClient mockPubsubQueueClient = Mockito.mock(PubsubQueueClient.class); + + @Test + public void testFetchMessages() throws IOException { + doNothing().when(mockSubscriber).close(); + when(mockPubsubQueueClient.getSubscriber(any())).thenReturn(mockSubscriber); + when(mockPubsubQueueClient.getNumUnAckedMessages(SUBSCRIPTION_ID)).thenReturn(3L); + doNothing().when(mockSubscriber).close(); + ReceivedMessage message1 = ReceivedMessage.newBuilder().setAckId("1").setMessage(PubsubMessage.newBuilder().setMessageId("msgId1").build()).build(); + ReceivedMessage message2 = ReceivedMessage.newBuilder().setAckId("2").setMessage(PubsubMessage.newBuilder().setMessageId("msgId2").build()).build(); + ReceivedMessage message3 = ReceivedMessage.newBuilder().setAckId("3").setMessage(PubsubMessage.newBuilder().setMessageId("msgId3").build()).build(); + when(mockPubsubQueueClient.makePullRequest(mockSubscriber, SUBSCRIPTION_NAME, SMALL_BATCH_SIZE)) + .thenReturn(PullResponse.newBuilder().addReceivedMessages(message1).build()) + .thenReturn(PullResponse.newBuilder().addReceivedMessages(message2).build()) + .thenReturn(PullResponse.newBuilder().addReceivedMessages(message3).build()); + + PubsubMessagesFetcher fetcher = new PubsubMessagesFetcher( + PROJECT_ID, SUBSCRIPTION_ID, SMALL_BATCH_SIZE, + MAX_MESSAGES_IN_REQUEST, MAX_WAIT_TIME_IN_REQUEST, mockPubsubQueueClient + ); + List messages = fetcher.fetchMessages(); + + assertEquals(3, messages.size()); + assertEquals("1", messages.get(0).getAckId()); + assertEquals("2", messages.get(1).getAckId()); + assertEquals("3", messages.get(2).getAckId()); + verify(mockPubsubQueueClient, times(3)).makePullRequest(mockSubscriber, SUBSCRIPTION_NAME, SMALL_BATCH_SIZE); + } + + @Test + public void testFetchMessagesZeroTimeout() throws IOException { + doNothing().when(mockSubscriber).close(); + when(mockPubsubQueueClient.getSubscriber(any())).thenReturn(mockSubscriber); + when(mockPubsubQueueClient.getNumUnAckedMessages(SUBSCRIPTION_ID)).thenReturn(100L); + PubsubMessagesFetcher fetcher = new PubsubMessagesFetcher( + PROJECT_ID, SUBSCRIPTION_ID, SMALL_BATCH_SIZE, + MAX_MESSAGES_IN_REQUEST, 0, mockPubsubQueueClient + ); + + List messages = fetcher.fetchMessages(); + assertEquals(0, messages.size()); + } + + @Test + public void testSendAcks() throws IOException { + doNothing().when(mockSubscriber).close(); + when(mockPubsubQueueClient.getSubscriber(any())).thenReturn(mockSubscriber); + List messageAcks = IntStream.range(0, 20).mapToObj(i -> "msg_" + i).collect(Collectors.toList()); + doNothing().when(mockPubsubQueueClient).makeAckRequest(eq(mockSubscriber), eq(SUBSCRIPTION_NAME), any()); + PubsubMessagesFetcher fetcher = new PubsubMessagesFetcher( + PROJECT_ID, SUBSCRIPTION_ID, SMALL_BATCH_SIZE, + MAX_MESSAGES_IN_REQUEST, MAX_WAIT_TIME_IN_REQUEST, mockPubsubQueueClient + ); + + fetcher.sendAcks(messageAcks); + verify(mockPubsubQueueClient, times(2)).makeAckRequest(eq(mockSubscriber), eq(SUBSCRIPTION_NAME), any()); + } + +} \ No newline at end of file