Skip to content

Commit 2ea5a23

Browse files
committed
Fix InProcessPipelineRunner to handle a null subscription
1 parent b4e391e commit 2ea5a23

File tree

3 files changed

+24
-6
lines changed

3 files changed

+24
-6
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
3131
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
3232
import com.google.cloud.dataflow.sdk.options.ValueProvider;
33+
import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider;
3334
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
3435
import com.google.cloud.dataflow.sdk.transforms.Combine;
3536
import com.google.cloud.dataflow.sdk.transforms.DoFn;
@@ -1290,6 +1291,7 @@ public String getIdLabel() {
12901291

12911292
@Override
12921293
public PCollection<T> apply(PBegin input) {
1294+
ValueProvider<SubscriptionPath> subscriptionPath = subscription;
12931295
if (subscription == null) {
12941296
try {
12951297
try (PubsubClient pubsubClient =
@@ -1299,9 +1301,8 @@ public PCollection<T> apply(PBegin input) {
12991301
.as(DataflowPipelineOptions.class))) {
13001302
checkState(project.isAccessible(), "createRandomSubscription must be called at runtime.");
13011303
checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime.");
1302-
SubscriptionPath subscriptionPath =
1303-
pubsubClient.createRandomSubscription(
1304-
project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC);
1304+
subscriptionPath = StaticValueProvider.of(pubsubClient.createRandomSubscription(
1305+
project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC));
13051306
LOG.warn("Created subscription {} to topic {}."
13061307
+ " Note this subscription WILL NOT be deleted when the pipeline terminates",
13071308
subscription, topic);
@@ -1314,7 +1315,7 @@ public PCollection<T> apply(PBegin input) {
13141315
return input.getPipeline().begin()
13151316
.apply(Read.from(new PubsubSource<T>(this)))
13161317
.apply(ParDo.named("PubsubUnboundedSource.Stats")
1317-
.of(new StatsFn<T>(pubsubFactory, subscription,
1318-
timestampLabel, idLabel)));
1318+
.of(new StatsFn<T>(pubsubFactory, checkNotNull(subscriptionPath),
1319+
timestampLabel, idLabel)));
13191320
}
13201321
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
372372
@Override
373373
public void createSubscription(
374374
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
375-
throw new UnsupportedOperationException();
375+
return;
376376
}
377377

378378
@Override

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
import com.google.cloud.dataflow.sdk.util.CoderUtils;
3737
import com.google.cloud.dataflow.sdk.util.PubsubClient;
3838
import com.google.cloud.dataflow.sdk.util.PubsubClient.IncomingMessage;
39+
import com.google.cloud.dataflow.sdk.util.PubsubClient.OutgoingMessage;
40+
import com.google.cloud.dataflow.sdk.util.PubsubClient.ProjectPath;
3941
import com.google.cloud.dataflow.sdk.util.PubsubClient.SubscriptionPath;
42+
import com.google.cloud.dataflow.sdk.util.PubsubClient.TopicPath;
4043
import com.google.cloud.dataflow.sdk.util.PubsubTestClient;
4144
import com.google.cloud.dataflow.sdk.util.PubsubTestClient.PubsubTestClientFactory;
4245

@@ -60,8 +63,12 @@
6063
*/
6164
@RunWith(JUnit4.class)
6265
public class PubsubUnboundedSourceTest {
66+
private static final ProjectPath PROJECT =
67+
PubsubClient.projectPathFromId("testProject");
6368
private static final SubscriptionPath SUBSCRIPTION =
6469
PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
70+
private static final TopicPath TOPIC =
71+
PubsubClient.topicPathFromName("testProject", "testTopic");
6572
private static final String DATA = "testData";
6673
private static final long TIMESTAMP = 1234L;
6774
private static final long REQ_TIME = 6373L;
@@ -320,4 +327,14 @@ public void readManyMessages() throws IOException {
320327
assertTrue(dataToMessageNum.isEmpty());
321328
reader.close();
322329
}
330+
331+
@Test
332+
public void testNullTopic() throws Exception {
333+
factory = PubsubTestClient.createFactoryForPublish(
334+
TOPIC, ImmutableList.<OutgoingMessage>of(), ImmutableList.<OutgoingMessage>of());
335+
TestPipeline p = TestPipeline.create();
336+
p.apply(new PubsubUnboundedSource<>(
337+
clock, factory, StaticValueProvider.of(PROJECT), StaticValueProvider.of(TOPIC),
338+
null, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL));
339+
}
323340
}

0 commit comments

Comments
 (0)