Skip to content

Commit

Permalink
Service Bus ARM template for live tests (#9978)
Browse files Browse the repository at this point in the history
* Make .json files two-space indents, so they're not so hard to view.

* Adding Service Bus ARM template.

* Clean up tests.yml and environment variables.

* Fixing ProxyReceive and send tests.
  • Loading branch information
conniey authored Apr 9, 2020
1 parent 5ed8941 commit 687ca03
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 222 deletions.
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ trim_trailing_whitespace = true
insert_final_newline = true
max_line_length = 120

[*.json]
indent_size = 2

# From https://maven.apache.org/developers/conventions/code.html
[*.xml]
indent_size = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,15 @@ private Disposable getRenewLockOperation(ServiceBusReceivedMessage message, Inst
"Cannot renew lock token without a value for 'message.getLockedUntil()'"));
}

final Duration initialInterval = Duration.between(Instant.now(), initialLockedUntil);
final Instant initialRefreshDuration = initialLockedUntil.minus(Duration.ofMillis(500));
Duration initialInterval = Duration.between(Instant.now(), initialRefreshDuration);
if (initialInterval.isNegative()) {
logger.info("Duration was negative. Moving to refresh immediately: {}", initialInterval.toMillis());
initialInterval = Duration.ZERO;
}

logger.info("lock[{}]. lockedUntil[{}]. interval[{}]", lockToken, initialLockedUntil, initialInterval);
logger.info("lock[{}]. lockedUntil[{}]. firstInterval[{}]. interval[{}]", lockToken, initialLockedUntil,
initialRefreshDuration, initialInterval);

final EmitterProcessor<Duration> emitterProcessor = EmitterProcessor.create();
final FluxSink<Duration> sink = emitterProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import static com.azure.core.amqp.ProxyOptions.PROXY_USERNAME;

public abstract class IntegrationTestBase extends TestBase {
protected static final Duration TIMEOUT = Duration.ofSeconds(50);
protected static final Duration TIMEOUT = Duration.ofSeconds(120);
protected static final AmqpRetryOptions RETRY_OPTIONS = new AmqpRetryOptions().setTryTimeout(TIMEOUT);
protected final ClientLogger logger;

private static final String PROXY_AUTHENTICATION_TYPE = "PROXY_AUTHENTICATION_TYPE";
private static final String AZURE_SERVICEBUS_CONNECTION_STRING = "AZURE_SERVICEBUS_CONNECTION_STRING";
private static final String AZURE_SERVICEBUS_CONNECTION_STRING = "AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING";

private static final String AZURE_SERVICEBUS_FULLY_QUALIFIED_DOMAIN_NAME = "AZURE_SERVICEBUS_FULLY_QUALIFIED_DOMAIN_NAME";
private static final String AZURE_SERVICEBUS_QUEUE_NAME = "AZURE_SERVICEBUS_QUEUE_NAME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.jproxy.ProxyServer;
import com.azure.messaging.servicebus.jproxy.SimpleProxy;
import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

Expand All @@ -19,6 +20,7 @@
import java.net.ProxySelector;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
Expand All @@ -27,8 +29,8 @@
* Verify we can use jproxy hosted locally to receive messages.
*/
public class ProxyReceiveTest extends IntegrationTestBase {
private static final int PROXY_PORT = 9340;
private static final int NUMBER_OF_EVENTS = 25;
private static final int PROXY_PORT = 9102;
private static final int NUMBER_OF_EVENTS = 10;

private static ProxyServer proxyServer;
private static ProxySelector defaultProxySelector;
Expand All @@ -37,10 +39,12 @@ public ProxyReceiveTest() {
super(new ClientLogger(ProxyReceiveTest.class));
}

@BeforeAll
public static void setup() throws IOException {
@BeforeEach
public void setup() throws IOException {
StepVerifier.setDefaultTimeout(Duration.ofSeconds(30));

proxyServer = new SimpleProxy(PROXY_PORT);
proxyServer.start(null);
proxyServer.start(error -> logger.error("Exception occurred in proxy.", error));

defaultProxySelector = ProxySelector.getDefault();
ProxySelector.setDefault(new ProxySelector() {
Expand All @@ -58,8 +62,10 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
});
}

@AfterAll()
public static void cleanup() throws Exception {
@AfterEach()
public void cleanup() throws Exception {
StepVerifier.resetDefaultTimeout();

if (proxyServer != null) {
proxyServer.stop();
}
Expand All @@ -68,27 +74,33 @@ public static void cleanup() throws Exception {
}

@Test
public void testReceiverStartOfStreamFilters() {
public void receiveMessage() {
// Arrange
final String queueName = getQueueName();

Assertions.assertNotNull(queueName, "'queueName' is not set in environment variable.");

final String messageTracking = UUID.randomUUID().toString();
final ServiceBusClientBuilder builder = new ServiceBusClientBuilder()
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
.connectionString(getConnectionString());

final List<ServiceBusMessage> messages = TestUtils.getServiceBusMessages(NUMBER_OF_EVENTS, messageTracking);
final ServiceBusSenderAsyncClient sender = builder.sender()
final ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
.connectionString(getConnectionString())
.sender()
.queueName(queueName)
.buildAsyncClient();

final ServiceBusReceiverAsyncClient receiver = builder.receiver()
final ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
.connectionString(getConnectionString())
.receiver()
.receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
.queueName(queueName)
.buildAsyncClient();

final ReceiveAsyncOptions options = new ReceiveAsyncOptions()
.setEnableAutoComplete(false);

// Act & Assert
try {
StepVerifier.create(sender.createBatch()
Expand All @@ -101,7 +113,7 @@ public void testReceiverStartOfStreamFilters() {
}))
.verifyComplete();

StepVerifier.create(receiver.receive().take(NUMBER_OF_EVENTS))
StepVerifier.create(receiver.receive(options).take(NUMBER_OF_EVENTS))
.expectNextCount(NUMBER_OF_EVENTS)
.expectComplete()
.verify(TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.jproxy.ProxyServer;
import com.azure.messaging.servicebus.jproxy.SimpleProxy;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

Expand All @@ -35,13 +35,12 @@ public ProxySendTest() {
super(new ClientLogger(ProxySendTest.class));
}

@BeforeAll
public static void initialize() throws Exception {
@BeforeEach
public void initialize() throws Exception {
StepVerifier.setDefaultTimeout(Duration.ofSeconds(30));

proxyServer = new SimpleProxy(PROXY_PORT);
proxyServer.start(t -> {
});
proxyServer.start(error -> logger.error("Exception occurred in proxy.", error));

defaultProxySelector = ProxySelector.getDefault();
ProxySelector.setDefault(new ProxySelector() {
Expand All @@ -57,15 +56,15 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
});
}

@AfterAll
public static void cleanupClient() throws Exception {
@AfterEach
public void cleanup() throws Exception {
StepVerifier.resetDefaultTimeout();

ProxySelector.setDefault(defaultProxySelector);

if (proxyServer != null) {
proxyServer.stop();
}

ProxySelector.setDefault(defaultProxySelector);
}

/**
Expand Down
Loading

0 comments on commit 687ca03

Please sign in to comment.