Skip to content

Commit d7a70fe

Browse files
committed
Fixups
1 parent 20862aa commit d7a70fe

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ private static class State {
107107
*/
108108
@Nullable
109109
Map<String, Long> ackDeadline;
110+
111+
/**
112+
* Whether a subscription has been created.
113+
*/
114+
boolean createdSubscription;
110115
}
111116

112117
private static final State STATE = new State();
@@ -124,12 +129,40 @@ public static PubsubTestClientFactory createFactoryForPublish(
124129
final TopicPath expectedTopic,
125130
final Iterable<OutgoingMessage> expectedOutgoingMessages,
126131
final Iterable<OutgoingMessage> failingOutgoingMessages) {
132+
return createFactoryForPublishInternal(
133+
expectedTopic, expectedOutgoingMessages, failingOutgoingMessages, false);
134+
}
135+
136+
/**
137+
* Return a factory for testing publishers. Only one factory may be in-flight at a time.
138+
* The factory must be closed when the test is complete, at which point final validation will
139+
* occur. Additionally, verify that createSubscription was called.
140+
*/
141+
public static PubsubTestClientFactory createFactoryForPublishVerifySubscription(
142+
final TopicPath expectedTopic,
143+
final Iterable<OutgoingMessage> expectedOutgoingMessages,
144+
final Iterable<OutgoingMessage> failingOutgoingMessages) {
145+
return createFactoryForPublishInternal(
146+
expectedTopic, expectedOutgoingMessages, failingOutgoingMessages, true);
147+
}
148+
149+
/**
150+
* Return a factory for testing publishers. Only one factory may be in-flight at a time.
151+
* The factory must be closed when the test is complete, at which point final validation will
152+
* occur.
153+
*/
154+
public static PubsubTestClientFactory createFactoryForPublishInternal(
155+
final TopicPath expectedTopic,
156+
final Iterable<OutgoingMessage> expectedOutgoingMessages,
157+
final Iterable<OutgoingMessage> failingOutgoingMessages,
158+
final boolean verifySubscriptionCreated) {
127159
synchronized (STATE) {
128160
checkState(!STATE.isActive, "Test still in flight");
129161
STATE.expectedTopic = expectedTopic;
130162
STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
131163
STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
132164
STATE.isActive = true;
165+
STATE.createdSubscription = false;
133166
}
134167
return new PubsubTestClientFactory() {
135168
@Override
@@ -148,6 +181,9 @@ public String getKind() {
148181
@Override
149182
public void close() {
150183
synchronized (STATE) {
184+
if (verifySubscriptionCreated) {
185+
checkState(STATE.createdSubscription, "Did not call create subscription");
186+
}
151187
checkState(STATE.isActive, "No test still in flight");
152188
checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
153189
"Still waiting for %s messages to be published",
@@ -372,6 +408,9 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
372408
@Override
373409
public void createSubscription(
374410
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
411+
synchronized (STATE) {
412+
STATE.createdSubscription = true;
413+
}
375414
return;
376415
}
377416

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ public void readManyMessages() throws IOException {
330330

331331
@Test
332332
public void testNullSubscription() throws Exception {
333-
factory = PubsubTestClient.createFactoryForPublish(
333+
factory = PubsubTestClient.createFactoryForPublishVerifySubscription(
334334
TOPIC, ImmutableList.<OutgoingMessage>of(), ImmutableList.<OutgoingMessage>of());
335335
TestPipeline p = TestPipeline.create();
336336
p.apply(new PubsubUnboundedSource<>(

0 commit comments

Comments
 (0)