Skip to content

Commit 9b9ee0b

Browse files
authored
Merge pull request GoogleCloudPlatform#547 from sammcveety/fix_pubsub_inprocess_null_topic
Fix InProcessPipelineRunner to handle a null subscription
2 parents b4e391e + d7a70fe commit 9b9ee0b

File tree

3 files changed

+63
-6
lines changed

3 files changed

+63
-6
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
3131
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
3232
import com.google.cloud.dataflow.sdk.options.ValueProvider;
33+
import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider;
3334
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
3435
import com.google.cloud.dataflow.sdk.transforms.Combine;
3536
import com.google.cloud.dataflow.sdk.transforms.DoFn;
@@ -1290,6 +1291,7 @@ public String getIdLabel() {
12901291

12911292
@Override
12921293
public PCollection<T> apply(PBegin input) {
1294+
ValueProvider<SubscriptionPath> subscriptionPath = subscription;
12931295
if (subscription == null) {
12941296
try {
12951297
try (PubsubClient pubsubClient =
@@ -1299,9 +1301,8 @@ public PCollection<T> apply(PBegin input) {
12991301
.as(DataflowPipelineOptions.class))) {
13001302
checkState(project.isAccessible(), "createRandomSubscription must be called at runtime.");
13011303
checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime.");
1302-
SubscriptionPath subscriptionPath =
1303-
pubsubClient.createRandomSubscription(
1304-
project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC);
1304+
subscriptionPath = StaticValueProvider.of(pubsubClient.createRandomSubscription(
1305+
project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC));
13051306
LOG.warn("Created subscription {} to topic {}."
13061307
+ " Note this subscription WILL NOT be deleted when the pipeline terminates",
13071308
subscription, topic);
@@ -1314,7 +1315,7 @@ public PCollection<T> apply(PBegin input) {
13141315
return input.getPipeline().begin()
13151316
.apply(Read.from(new PubsubSource<T>(this)))
13161317
.apply(ParDo.named("PubsubUnboundedSource.Stats")
1317-
.of(new StatsFn<T>(pubsubFactory, subscription,
1318-
timestampLabel, idLabel)));
1318+
.of(new StatsFn<T>(pubsubFactory, checkNotNull(subscriptionPath),
1319+
timestampLabel, idLabel)));
13191320
}
13201321
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ private static class State {
107107
*/
108108
@Nullable
109109
Map<String, Long> ackDeadline;
110+
111+
/**
112+
* Whether a subscription has been created.
113+
*/
114+
boolean createdSubscription;
110115
}
111116

112117
private static final State STATE = new State();
@@ -124,12 +129,40 @@ public static PubsubTestClientFactory createFactoryForPublish(
124129
final TopicPath expectedTopic,
125130
final Iterable<OutgoingMessage> expectedOutgoingMessages,
126131
final Iterable<OutgoingMessage> failingOutgoingMessages) {
132+
return createFactoryForPublishInternal(
133+
expectedTopic, expectedOutgoingMessages, failingOutgoingMessages, false);
134+
}
135+
136+
/**
137+
* Return a factory for testing publishers. Only one factory may be in-flight at a time.
138+
* The factory must be closed when the test is complete, at which point final validation will
139+
* occur. Additionally, verify that createSubscription was called.
140+
*/
141+
public static PubsubTestClientFactory createFactoryForPublishVerifySubscription(
142+
final TopicPath expectedTopic,
143+
final Iterable<OutgoingMessage> expectedOutgoingMessages,
144+
final Iterable<OutgoingMessage> failingOutgoingMessages) {
145+
return createFactoryForPublishInternal(
146+
expectedTopic, expectedOutgoingMessages, failingOutgoingMessages, true);
147+
}
148+
149+
/**
150+
* Return a factory for testing publishers. Only one factory may be in-flight at a time.
151+
* The factory must be closed when the test is complete, at which point final validation will
152+
* occur.
153+
*/
154+
public static PubsubTestClientFactory createFactoryForPublishInternal(
155+
final TopicPath expectedTopic,
156+
final Iterable<OutgoingMessage> expectedOutgoingMessages,
157+
final Iterable<OutgoingMessage> failingOutgoingMessages,
158+
final boolean verifySubscriptionCreated) {
127159
synchronized (STATE) {
128160
checkState(!STATE.isActive, "Test still in flight");
129161
STATE.expectedTopic = expectedTopic;
130162
STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
131163
STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
132164
STATE.isActive = true;
165+
STATE.createdSubscription = false;
133166
}
134167
return new PubsubTestClientFactory() {
135168
@Override
@@ -148,6 +181,9 @@ public String getKind() {
148181
@Override
149182
public void close() {
150183
synchronized (STATE) {
184+
if (verifySubscriptionCreated) {
185+
checkState(STATE.createdSubscription, "Did not call create subscription");
186+
}
151187
checkState(STATE.isActive, "No test still in flight");
152188
checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
153189
"Still waiting for %s messages to be published",
@@ -372,7 +408,10 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
372408
@Override
373409
public void createSubscription(
374410
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
375-
throw new UnsupportedOperationException();
411+
synchronized (STATE) {
412+
STATE.createdSubscription = true;
413+
}
414+
return;
376415
}
377416

378417
@Override

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
import com.google.cloud.dataflow.sdk.util.CoderUtils;
3737
import com.google.cloud.dataflow.sdk.util.PubsubClient;
3838
import com.google.cloud.dataflow.sdk.util.PubsubClient.IncomingMessage;
39+
import com.google.cloud.dataflow.sdk.util.PubsubClient.OutgoingMessage;
40+
import com.google.cloud.dataflow.sdk.util.PubsubClient.ProjectPath;
3941
import com.google.cloud.dataflow.sdk.util.PubsubClient.SubscriptionPath;
42+
import com.google.cloud.dataflow.sdk.util.PubsubClient.TopicPath;
4043
import com.google.cloud.dataflow.sdk.util.PubsubTestClient;
4144
import com.google.cloud.dataflow.sdk.util.PubsubTestClient.PubsubTestClientFactory;
4245

@@ -60,8 +63,12 @@
6063
*/
6164
@RunWith(JUnit4.class)
6265
public class PubsubUnboundedSourceTest {
66+
private static final ProjectPath PROJECT =
67+
PubsubClient.projectPathFromId("testProject");
6368
private static final SubscriptionPath SUBSCRIPTION =
6469
PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
70+
private static final TopicPath TOPIC =
71+
PubsubClient.topicPathFromName("testProject", "testTopic");
6572
private static final String DATA = "testData";
6673
private static final long TIMESTAMP = 1234L;
6774
private static final long REQ_TIME = 6373L;
@@ -320,4 +327,14 @@ public void readManyMessages() throws IOException {
320327
assertTrue(dataToMessageNum.isEmpty());
321328
reader.close();
322329
}
330+
331+
@Test
332+
public void testNullSubscription() throws Exception {
333+
factory = PubsubTestClient.createFactoryForPublishVerifySubscription(
334+
TOPIC, ImmutableList.<OutgoingMessage>of(), ImmutableList.<OutgoingMessage>of());
335+
TestPipeline p = TestPipeline.create();
336+
p.apply(new PubsubUnboundedSource<>(
337+
clock, factory, StaticValueProvider.of(PROJECT), StaticValueProvider.of(TOPIC),
338+
null, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL));
339+
}
323340
}

0 commit comments

Comments
 (0)