Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.auto.value.AutoValue;
import com.google.pubsub.v1.PubsubMessage;
import org.apache.beam.sdk.coders.DefaultCoder;

/** A message received from Pubsub. */
@AutoValue
@DefaultCoder(IncomingMessageCoder.class)
abstract class IncomingMessage {

/** Underlying Message. */
public abstract com.google.pubsub.v1.PubsubMessage message();

/**
* Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom
* timestamp associated with the message.
*/
public abstract long timestampMsSinceEpoch();

/** Timestamp (in system time) at which we requested the message (ms since epoch). */
public abstract long requestTimeMsSinceEpoch();

/** Id to pass back to Pubsub to acknowledge receipt of this message. */
public abstract String ackId();

/** Id to pass to the runner to distinguish this message from all others. */
public abstract String recordId();

public static IncomingMessage of(
PubsubMessage message,
long timestampMsSinceEpoch,
long requestTimeMsSinceEpoch,
String ackId,
String recordId) {
return new AutoValue_IncomingMessage(
message, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.CoderProvider;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.values.TypeDescriptor;

/** A coder for IncomingMessage. */
public class IncomingMessageCoder extends CustomCoder<IncomingMessage> {
public static CoderProvider getCoderProvider() {
return CoderProviders.forCoder(
TypeDescriptor.of(IncomingMessage.class), new IncomingMessageCoder());
}

public static IncomingMessageCoder of() {
return new IncomingMessageCoder();
}

@Override
public void encode(IncomingMessage value, OutputStream outStream) throws IOException {
ProtoCoder.of(PubsubMessage.class).encode(value.message(), outStream);
VarLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream);
VarLongCoder.of().encode(value.requestTimeMsSinceEpoch(), outStream);
StringUtf8Coder.of().encode(value.ackId(), outStream);
StringUtf8Coder.of().encode(value.recordId(), outStream);
}

@Override
public IncomingMessage decode(InputStream inStream) throws IOException {
return IncomingMessage.of(
ProtoCoder.of(PubsubMessage.class).decode(inStream),
VarLongCoder.of().decode(inStream),
VarLongCoder.of().decode(inStream),
StringUtf8Coder.of().decode(inStream),
StringUtf8Coder.of().decode(inStream));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.auto.value.AutoValue;
import com.google.protobuf.ByteString;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.DefaultCoder;

/** A message to be sent to Pubsub. */
@AutoValue
@DefaultCoder(OutgoingMessageCoder.class)
abstract class OutgoingMessage {

/** Underlying Message. May not have publish timestamp set. */
public abstract com.google.pubsub.v1.PubsubMessage message();

/** Timestamp for element (ms since epoch). */
public abstract long timestampMsSinceEpoch();

/**
* If using an id attribute, the record id to associate with this record's metadata so the
* receiver can reject duplicates. Otherwise {@literal null}.
*/
@Nullable
public abstract String recordId();

public static OutgoingMessage of(
com.google.pubsub.v1.PubsubMessage message,
long timestampMsSinceEpoch,
@Nullable String recordId) {
return new AutoValue_OutgoingMessage(message, timestampMsSinceEpoch, recordId);
}

public static OutgoingMessage of(
PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) {
com.google.pubsub.v1.PubsubMessage.Builder builder =
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(message.getPayload()));
if (message.getAttributeMap() != null) {
builder.putAllAttributes(message.getAttributeMap());
}
return of(builder.build(), timestampMsSinceEpoch, recordId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.CoderProvider;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.values.TypeDescriptor;

/** A coder for OutgoingMessage. */
public class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
public static CoderProvider getCoderProvider() {
return CoderProviders.forCoder(
TypeDescriptor.of(OutgoingMessage.class), new OutgoingMessageCoder());
}

public static OutgoingMessageCoder of() {
return new OutgoingMessageCoder();
}

@Override
public void encode(OutgoingMessage value, OutputStream outStream) throws IOException {
ProtoCoder.of(PubsubMessage.class).encode(value.message(), outStream);
VarLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream);
NullableCoder.of(StringUtf8Coder.of()).encode(value.recordId(), outStream);
}

@Override
public OutgoingMessage decode(InputStream inStream) throws IOException {
return OutgoingMessage.of(
ProtoCoder.of(PubsubMessage.class).decode(inStream),
VarLongCoder.of().decode(inStream),
NullableCoder.of(StringUtf8Coder.of()).decode(inStream));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.naming.SizeLimitExceededException;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;

/** Writer to Pubsub which batches messages from bounded collections. */
class PubsubBoundedWriter<T> extends DoFn<T, Void> {
/**
* Max batch byte size. Messages are base64 encoded which encodes each set of three bytes into
* four bytes.
*/
private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = ((10 * 1000 * 1000) / 4) * 3;

private static final int MAX_PUBLISH_BATCH_SIZE = 100;

private transient List<OutgoingMessage> output;
private transient PubsubClient pubsubClient;
private transient int currentOutputBytes;

private final int maxPublishBatchByteSize;
private final int maxPublishBatchSize;
private final PubsubIO.Write<T> write;

private PubsubBoundedWriter(PubsubIO.Write<T> write) {
Preconditions.checkNotNull(write.getTopicProvider());
this.maxPublishBatchSize =
MoreObjects.firstNonNull(write.getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE);
this.maxPublishBatchByteSize =
MoreObjects.firstNonNull(write.getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT);
this.write = write;
}

static <T> PubsubBoundedWriter<T> of(PubsubIO.Write<T> write) {
return new PubsubBoundedWriter<>(write);
}

@StartBundle
public void startBundle(StartBundleContext c) throws IOException {
this.output = new ArrayList<>();
this.currentOutputBytes = 0;

// NOTE: idAttribute is ignored.
this.pubsubClient =
write
.getPubsubClientFactory()
.newClient(
write.getTimestampAttribute(),
null,
c.getPipelineOptions().as(PubsubOptions.class));
}

@ProcessElement
public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException {
byte[] payload;
PubsubMessage message = write.getFormatFn().apply(c.element());
payload = message.getPayload();
Map<String, String> attributes = message.getAttributeMap();

if (payload.length > maxPublishBatchByteSize) {
String msg =
String.format(
"Pub/Sub message size (%d) exceeded maximum batch size (%d)",
payload.length, maxPublishBatchByteSize);
throw new SizeLimitExceededException(msg);
}

// Checking before adding the message stops us from violating the max bytes
if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize)
|| (output.size() >= maxPublishBatchSize)) {
publish();
}

// NOTE: The record id is always null.
output.add(
OutgoingMessage.of(
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(payload))
.putAllAttributes(attributes)
.build(),
c.timestamp().getMillis(),
null));
currentOutputBytes += payload.length;
}

@FinishBundle
public void finishBundle() throws IOException {
if (!output.isEmpty()) {
publish();
}
output = null;
currentOutputBytes = 0;
pubsubClient.close();
pubsubClient = null;
}

private void publish() throws IOException {
PubsubTopic topic = write.getTopicProvider().get();
int n = pubsubClient.publish(PubsubClient.topicPathFromPath(topic.asPath()), output);
checkState(n == output.size());
output.clear();
currentOutputBytes = 0;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.delegate(write);
}
}
Loading