Skip to content

Commit

Permalink
[HUDI-7086] Scaling gcs event source (apache#10073)
Browse files Browse the repository at this point in the history
-  Scaling gcs event source

---------

Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local>
  • Loading branch information
nsivabalan and rmahindra123 authored Nov 24, 2023
1 parent bb42c4b commit a7fd27c
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,17 @@ public class CloudSourceConfig extends HoodieConfig {
.defaultValue(10)
.withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.meta.batch.size")
.markAdvanced()
.withDocumentation("Number of metadata messages to pull at a time");
.withDocumentation("Number of metadata messages to pull in one API call to the cloud events queue. "
+ "Multiple API calls with this batch size are sent to cloud events queue, until we consume hoodie.streamer.source.cloud.meta.max.num.messages.per.sync"
+ "from the queue or hoodie.streamer.source.cloud.meta.max.fetch.time.per.sync.ms amount of time has passed or queue is empty. ");

public static final ConfigProperty<Integer> MAX_NUM_MESSAGES_PER_SYNC = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.max.num.messages.per.sync")
.defaultValue(1000)
.markAdvanced()
.sinceVersion("0.14.1")
.withDocumentation("Maximum number of messages to consume per sync round. Multiple rounds of "
+ BATCH_SIZE_CONF.key() + " could be invoked to reach max messages as configured by this config");

public static final ConfigProperty<Boolean> ACK_MESSAGES = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.ack")
Expand Down Expand Up @@ -137,4 +147,12 @@ public class CloudSourceConfig extends HoodieConfig {
.sinceVersion("0.14.1")
.withDocumentation("specify this value in bytes, to coalesce partitions of source dataset not greater than specified limit");

public static final ConfigProperty<Integer> MAX_FETCH_TIME_PER_SYNC_MS = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.max.fetch.time.per.sync.ms")
.defaultValue(1)
.markAdvanced()
.sinceVersion("0.14.1")
.withDocumentation("Max time in millis to consume " + MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event queues like SQS, "
+ "PubSub can return empty responses even when messages are available the queue, this config ensures we don't wait forever "
+ "to consume MAX_MESSAGES_CONF messages, but time out and move on further.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.config.CloudSourceConfig.ACK_MESSAGES;
import static org.apache.hudi.utilities.config.CloudSourceConfig.BATCH_SIZE_CONF;
import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_MS;
import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_NUM_MESSAGES_PER_SYNC;
import static org.apache.hudi.utilities.config.GCSEventsSourceConfig.GOOGLE_PROJECT_ID;
import static org.apache.hudi.utilities.config.GCSEventsSourceConfig.PUBSUB_SUBSCRIPTION_ID;
import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.ProcessingDecision.DO_SKIP;
Expand Down Expand Up @@ -117,8 +119,9 @@ public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, SparkSession
new PubsubMessagesFetcher(
getStringWithAltKeys(props, GOOGLE_PROJECT_ID),
getStringWithAltKeys(props, PUBSUB_SUBSCRIPTION_ID),
getIntWithAltKeys(props, BATCH_SIZE_CONF)
)
getIntWithAltKeys(props, BATCH_SIZE_CONF),
getIntWithAltKeys(props, MAX_NUM_MESSAGES_PER_SYNC),
getIntWithAltKeys(props, MAX_FETCH_TIME_PER_SYNC_MS))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,57 @@

import org.apache.hudi.exception.HoodieException;

import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create;
import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE;

/**
* Fetch messages from a specified Google Cloud Pubsub subscription.
*/
public class PubsubMessagesFetcher {

private static final int DEFAULT_BATCH_SIZE_ACK_API = 10;
private static final long MAX_WAIT_TIME_TO_ACK_MESSAGES = TimeUnit.MINUTES.toMillis(1);
private static final int ACK_PRODUCER_THREAD_POOL_SIZE = 3;

private final ExecutorService threadPool = Executors.newFixedThreadPool(ACK_PRODUCER_THREAD_POOL_SIZE);
private final String googleProjectId;
private final String pubsubSubscriptionId;

private final int batchSize;
private final int maxMessagesPerSync;
private final long maxFetchTimePerSync;
private final SubscriberStubSettings subscriberStubSettings;
private final PubsubQueueClient pubsubQueueClient;

private static final Logger LOG = LoggerFactory.getLogger(PubsubMessagesFetcher.class);

public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize) {
public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize,
int maxMessagesPerSync,
long maxFetchTimePerSync,
PubsubQueueClient pubsubQueueClient) {
this.googleProjectId = googleProjectId;
this.pubsubSubscriptionId = pubsubSubscriptionId;
this.batchSize = batchSize;
this.maxMessagesPerSync = maxMessagesPerSync;
this.maxFetchTimePerSync = maxFetchTimePerSync;

try {
/** For details of timeout and retry configs,
Expand All @@ -69,49 +86,60 @@ public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId
} catch (IOException e) {
throw new HoodieException("Error creating subscriber stub settings", e);
}
this.pubsubQueueClient = pubsubQueueClient;
}

public PubsubMessagesFetcher(
String googleProjectId,
String pubsubSubscriptionId,
int batchSize,
int maxMessagesPerSync,
long maxFetchTimePerSync) {
this(
googleProjectId,
pubsubSubscriptionId,
batchSize,
maxMessagesPerSync,
maxFetchTimePerSync,
new PubsubQueueClient()
);
}

public List<ReceivedMessage> fetchMessages() {
try {
try (SubscriberStub subscriber = createSubscriber()) {
String subscriptionName = getSubscriptionName();
PullResponse pullResponse = makePullRequest(subscriber, subscriptionName);
return pullResponse.getReceivedMessagesList();
List<ReceivedMessage> messageList = new ArrayList<>();
try (SubscriberStub subscriber = pubsubQueueClient.getSubscriber(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(googleProjectId, pubsubSubscriptionId);
long startTime = System.currentTimeMillis();
long unAckedMessages = pubsubQueueClient.getNumUnAckedMessages(this.pubsubSubscriptionId);
LOG.info("Found unacked messages " + unAckedMessages);
while (messageList.size() < unAckedMessages && messageList.size() < maxMessagesPerSync && (System.currentTimeMillis() - startTime < maxFetchTimePerSync)) {
PullResponse pullResponse = pubsubQueueClient.makePullRequest(subscriber, subscriptionName, batchSize);
messageList.addAll(pullResponse.getReceivedMessagesList());
}
} catch (IOException e) {
return messageList;
} catch (Exception e) {
throw new HoodieException("Error when fetching metadata", e);
}
}

public void sendAcks(List<String> messagesToAck) throws IOException {
String subscriptionName = getSubscriptionName();
try (SubscriberStub subscriber = createSubscriber()) {

AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(messagesToAck)
.build();

subscriber.acknowledgeCallable().call(acknowledgeRequest);

LOG.info("Acknowledged messages: " + messagesToAck);
try (SubscriberStub subscriber = pubsubQueueClient.getSubscriber(subscriberStubSettings)) {
int numberOfBatches = (int) Math.ceil((double) messagesToAck.size() / DEFAULT_BATCH_SIZE_ACK_API);
CompletableFuture.allOf(IntStream.range(0, numberOfBatches)
.parallel()
.boxed()
.map(batchIndex -> getTask(subscriber, messagesToAck, batchIndex)).toArray(CompletableFuture[]::new))
.get(MAX_WAIT_TIME_TO_ACK_MESSAGES, TimeUnit.MILLISECONDS);
LOG.debug("Flushed out all outstanding acknowledged messages: " + messagesToAck.size());
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw new IOException("Failed to ack messages from PubSub", e);
}
}

private PullResponse makePullRequest(SubscriberStub subscriber, String subscriptionName) {
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(batchSize)
.setSubscription(subscriptionName)
.build();

return subscriber.pullCallable().call(pullRequest);
}

private GrpcSubscriberStub createSubscriber() throws IOException {
return create(subscriberStubSettings);
}

private String getSubscriptionName() {
return ProjectSubscriptionName.format(googleProjectId, pubsubSubscriptionId);
private CompletableFuture<Void> getTask(SubscriberStub subscriber, List<String> messagesToAck, int batchIndex) {
String subscriptionName = ProjectSubscriptionName.format(googleProjectId, pubsubSubscriptionId);
List<String> messages = messagesToAck.subList(batchIndex, Math.min(batchIndex + DEFAULT_BATCH_SIZE_ACK_API, messagesToAck.size()));
return CompletableFuture.runAsync(() -> pubsubQueueClient.makeAckRequest(subscriber, subscriptionName, messages), threadPool);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources.helpers.gcs;

import com.google.cloud.ServiceOptions;
import com.google.cloud.monitoring.v3.MetricServiceClient;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.monitoring.v3.ListTimeSeriesRequest;
import com.google.monitoring.v3.Point;
import com.google.monitoring.v3.ProjectName;
import com.google.monitoring.v3.TimeInterval;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PubsubQueueClient {
private static final String METRIC_FILTER_PATTERN = "metric.type=\"pubsub.googleapis.com/subscription/%s\" AND resource.label.subscription_id=\"%s\"";
private static final String NUM_UNDELIVERED_MESSAGES = "num_undelivered_messages";

public SubscriberStub getSubscriber(SubscriberStubSettings subscriberStubSettings) throws IOException {
return GrpcSubscriberStub.create(subscriberStubSettings);
}

public PullResponse makePullRequest(SubscriberStub subscriber, String subscriptionName, int batchSize) throws IOException {
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(batchSize)
.setSubscription(subscriptionName)
.build();
return subscriber.pullCallable().call(pullRequest);
}

public void makeAckRequest(SubscriberStub subscriber, String subscriptionName, List<String> messages) {
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(messages)
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}

public long getNumUnAckedMessages(String subscriptionId) throws IOException {
try (MetricServiceClient metricServiceClient = MetricServiceClient.create()) {
MetricServiceClient.ListTimeSeriesPagedResponse response = metricServiceClient.listTimeSeries(
ListTimeSeriesRequest.newBuilder()
.setName(ProjectName.of(ServiceOptions.getDefaultProjectId()).toString())
.setFilter(String.format(METRIC_FILTER_PATTERN, NUM_UNDELIVERED_MESSAGES, subscriptionId))
.setInterval(TimeInterval.newBuilder()
.setStartTime(Timestamps.fromSeconds(Instant.now().getEpochSecond() - TimeUnit.MINUTES.toSeconds(2)))
.setEndTime(Timestamps.fromSeconds(Instant.now().getEpochSecond()))
.build())
.build());
// use the latest value from the window
List<Point> pointList = response.getPage().getValues().iterator().next().getPointsList();
return pointList.stream().findFirst().map(point -> point.getValue().getInt64Value()).orElse(Long.MAX_VALUE);
}
}
}
Loading

0 comments on commit a7fd27c

Please sign in to comment.