Skip to content

Commit

Permalink
Adds support for Event Hubs on v2 stack (Azure#41646)
Browse files Browse the repository at this point in the history
* [Event hubs] V2 stack onboarding (Azure#40435)

* Defining the opt-in flags for event hubs clients on v2 stack, and placeholder for v2 connection cache

* move v2 support class outside the builder

* Init V2StackSupport in builder to see if spring fails

* Adding ConnectionCacheWrapper to delegate to v1 EventHubConnectionProcessor or v2 ReactorConnectionCache, EventHubReactorSession to inspect v2 and create v2 ReceiveLinkHandler2 and update V2StackSupport to create ReactorConnectionCache

* use unreleased azure-core-amqp

* Integrating the MessageFlux to EventHubsPartitionAsyncConsumer

* Integrating the WindowedSubscriber to EventHubConsumerClient

* Wire the v2 opt-in in EventHubClientBuilder.

* remove unused import in EventHubConsumerAsyncClientTest

* Use released version of azure-core-amqp, update changelog about v2 stack integration

* Prepare beta release (Azure#41586)

* Increment package versions for eventhubs releases (Azure#41595)

---------

Co-authored-by: Anu Thomas Chandy <anuamd@hotmail.com>
Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 26, 2024
1 parent ac8a9e3 commit 4e61bd6
Show file tree
Hide file tree
Showing 29 changed files with 672 additions and 127 deletions.
4 changes: 2 additions & 2 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ com.azure:azure-media-videoanalyzer-edge;1.0.0-beta.6;1.0.0-beta.7
com.azure:azure-messaging-eventgrid;4.24.0;4.25.0-beta.1
com.azure:azure-messaging-eventgrid-namespaces;1.0.1;1.1.0-beta.1
com.azure:azure-messaging-eventgrid-cloudnative-cloudevents;1.0.0-beta.1;1.0.0-beta.2
com.azure:azure-messaging-eventhubs;5.18.7;5.19.0-beta.2
com.azure:azure-messaging-eventhubs-checkpointstore-blob;1.19.7;1.20.0-beta.2
com.azure:azure-messaging-eventhubs;5.18.7;5.19.0-beta.3
com.azure:azure-messaging-eventhubs-checkpointstore-blob;1.19.7;1.20.0-beta.3
com.azure:azure-messaging-eventhubs-checkpointstore-jedis;1.0.0-beta.3;1.0.0-beta.4
com.azure:azure-messaging-eventhubs-stress;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-messaging-eventhubs-track1-perf;1.0.0-beta.1;1.0.0-beta.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 1.20.0-beta.2 (Unreleased)
## 1.20.0-beta.3 (Unreleased)

### Features Added

Expand All @@ -10,31 +10,31 @@

### Other Changes

## 1.19.6 (2024-07-26)
## 1.20.0-beta.2 (2024-08-20)

### Other Changes

#### Dependency Updates

- Upgraded `azure-messaging-eventhubs` from `5.18.5` to version `5.18.6`.
- Upgraded `azure-messaging-eventhubs` from `5.18.6` to version `5.19.0-beta.2`.
- Upgraded `azure-storage-blob` from `12.26.1` to version `12.27.0`.

## 1.19.5 (2024-06-24)
## 1.19.6 (2024-07-26)

### Other Changes

#### Dependency Updates

- Upgraded `azure-messaging-eventhubs` from `5.18.4` to version `5.18.5`.
- Upgraded `azure-storage-blob` from `12.26.0` to version `12.26.1`.
- Upgraded `azure-messaging-eventhubs` from `5.18.5` to version `5.18.6`.

## 1.19.3 (2024-05-28)
## 1.19.5 (2024-06-24)

### Other Changes

#### Dependency Updates

- Upgraded `azure-messaging-eventhubs` from `5.18.3` to version `5.18.4`.
- Upgraded `azure-storage-blob` from `12.25.3` to version `12.26.0`.
- Upgraded `azure-messaging-eventhubs` from `5.18.4` to version `5.18.5`.
- Upgraded `azure-storage-blob` from `12.26.0` to version `12.26.1`.

## 1.20.0-beta.1 (2024-05-21)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ add the direct dependency to your project as follows.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.19.6</version>
<version>1.20.0-beta.2</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.0-beta.2</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
<version>1.20.0-beta.3</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->

<name>Microsoft Azure client library for storing checkpoints in Storage Blobs</name>
<description>Library for using storing checkpoints in Storage Blobs</description>
Expand Down Expand Up @@ -49,7 +49,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.19.0-beta.2</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.19.0-beta.3</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.19.0-beta.2</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.19.0-beta.3</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
</dependency>
<dependency>
<groupId>redis.clients</groupId>
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhubs/azure-messaging-eventhubs-stress/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.19.0-beta.2</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.19.0-beta.3</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.0-beta.2</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
<version>1.20.0-beta.3</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
</dependency>

<!-- logging, tracing, metrics -->
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhubs/azure-messaging-eventhubs-track2-perf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.19.0-beta.2</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.19.0-beta.3</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.0-beta.2</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
<version>1.20.0-beta.3</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
15 changes: 14 additions & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 5.19.0-beta.2 (Unreleased)
## 5.19.0-beta.3 (Unreleased)

### Features Added

Expand All @@ -10,6 +10,19 @@

### Other Changes

## 5.19.0-beta.2 (2024-08-20)

### Features Added

- Enabling V2 stack support for Event Hubs, which can be opt-in using the configuration ` com.azure.messaging.eventhubs.v2`.

### Other Changes

#### Dependency Updates

- Upgraded `azure-core` from `1.50.0` to version `1.51.0`.
- Upgraded `azure-core-amqp` from `2.9.7` to version `2.9.8`.

## 5.18.6 (2024-07-26)

### Bugs Fixed
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ add the direct dependency to your project as follows.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.18.6</version>
<version>5.19.0-beta.2</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.0-beta.2</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
<version>1.20.0-beta.3</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.19.0-beta.2</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.19.0-beta.3</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->

<name>Microsoft Azure client library for Event Hubs</name>
<description>Libraries built on Microsoft Azure Event Hubs</description>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.ReactorConnectionCache;
import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
import com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

import java.util.Objects;

import static com.azure.core.amqp.implementation.RetryUtil.withRetry;

/**
* Temporary type to support connection cache either in v1 or v2 stack.
* v2 underlying connection cache is {@link ReactorConnectionCache}
* v1 underlying connection cache is {@link EventHubConnectionProcessor}
*/
final class ConnectionCacheWrapper implements Disposable {
private final boolean isV2;
private final ReactorConnectionCache<EventHubReactorAmqpConnection> cache;
private final EventHubConnectionProcessor processor;

ConnectionCacheWrapper(ReactorConnectionCache<EventHubReactorAmqpConnection> cache) {
this.isV2 = true;
this.cache = Objects.requireNonNull(cache, "'cache' cannot be null.");
this.processor = null;
}

ConnectionCacheWrapper(EventHubConnectionProcessor processor) {
this.isV2 = false;
this.processor = Objects.requireNonNull(processor, "'processor' cannot be null.");
this.cache = null;
}

boolean isV2() {
return isV2;
}

Mono<EventHubAmqpConnection> getConnection() {
return isV2 ? cache.get().cast(EventHubAmqpConnection.class) : processor;
}

String getFullyQualifiedNamespace() {
return isV2 ? cache.getFullyQualifiedNamespace() : processor.getFullyQualifiedNamespace();
}

String getEventHubName() {
return isV2 ? cache.getEntityPath() : processor.getEventHubName();
}

AmqpRetryOptions getRetryOptions() {
return isV2 ? cache.getRetryOptions() : processor.getRetryOptions();
}

boolean isChannelClosed() {
return isV2 ? cache.isCurrentConnectionClosed() : processor.isChannelClosed();
}

Mono<EventHubManagementNode> getManagementNodeWithRetries() {
if (isV2) {
final AmqpRetryOptions retryOptions = cache.getRetryOptions();
return withRetry(cache.get().flatMap(EventHubReactorAmqpConnection::getManagementNode),
retryOptions, "Time out creating management node.");
} else {
return processor.getManagementNodeWithRetries();
}
}

@Override
public boolean isDisposed() {
return isV2 ? cache.isDisposed() : processor.isDisposed();
}

@Override
public void dispose() {
if (isV2) {
cache.dispose();
} else {
processor.dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsConsumerInstrumentation;
import reactor.core.publisher.Flux;
Expand All @@ -29,15 +28,15 @@
class EventHubAsyncClient implements Closeable {
private static final ClientLogger LOGGER = new ClientLogger(EventHubAsyncClient.class);
private final MessageSerializer messageSerializer;
private final EventHubConnectionProcessor connectionProcessor;
private final ConnectionCacheWrapper connectionProcessor;
private final Scheduler scheduler;
private final boolean isSharedConnection;
private final Runnable onClientClose;
private final String identifier;
private final Tracer tracer;
private final Meter meter;

EventHubAsyncClient(EventHubConnectionProcessor connectionProcessor, MessageSerializer messageSerializer,
EventHubAsyncClient(ConnectionCacheWrapper connectionProcessor, MessageSerializer messageSerializer,
Scheduler scheduler, boolean isSharedConnection, Runnable onClientClose, String identifier, Meter meter, Tracer tracer) {
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
this.connectionProcessor = Objects.requireNonNull(connectionProcessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorConnectionCache;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.StringUtil;
Expand Down Expand Up @@ -257,7 +258,7 @@ public class EventHubClientBuilder implements
private String fullyQualifiedNamespace;
private String eventHubName;
private String consumerGroup;
private EventHubConnectionProcessor eventHubConnectionProcessor;
private ConnectionCacheWrapper eventHubConnectionProcessor;
private Integer prefetchCount;
private ClientOptions clientOptions;
private SslDomain.VerifyMode verifyMode;
Expand All @@ -275,6 +276,7 @@ public class EventHubClientBuilder implements
* Keeps track of the open clients that were created from this builder when there is a shared connection.
*/
private final AtomicInteger openClients = new AtomicInteger();
private final V2StackSupport v2StackSupport = new V2StackSupport(LOGGER);

/**
* Creates a new instance with the default transport {@link AmqpTransportType#AMQP} and a non-shared connection. A
Expand Down Expand Up @@ -989,11 +991,15 @@ EventHubAsyncClient buildAsyncClient() {

final MessageSerializer messageSerializer = new EventHubMessageSerializer();

final EventHubConnectionProcessor processor;
final ConnectionCacheWrapper processor;
if (isSharedConnection.get()) {
synchronized (connectionLock) {
if (eventHubConnectionProcessor == null) {
eventHubConnectionProcessor = buildConnectionProcessor(messageSerializer, meter);
if (v2StackSupport.isV2StackEnabled(configuration)) {
eventHubConnectionProcessor = new ConnectionCacheWrapper(buildConnectionCache(messageSerializer, meter));
} else {
eventHubConnectionProcessor = new ConnectionCacheWrapper(buildConnectionProcessor(messageSerializer, meter));
}
}
}

Expand All @@ -1002,7 +1008,11 @@ EventHubAsyncClient buildAsyncClient() {
final int numberOfOpenClients = openClients.incrementAndGet();
LOGGER.info("# of open clients with shared connection: {}", numberOfOpenClients);
} else {
processor = buildConnectionProcessor(messageSerializer, meter);
if (v2StackSupport.isV2StackEnabled(configuration)) {
processor = new ConnectionCacheWrapper(buildConnectionCache(messageSerializer, meter));
} else {
processor = new ConnectionCacheWrapper(buildConnectionProcessor(messageSerializer, meter));
}
}

String identifier;
Expand Down Expand Up @@ -1113,7 +1123,7 @@ private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer m

final EventHubAmqpConnection connection = new EventHubReactorAmqpConnection(connectionId,
connectionOptions, getEventHubName.get(), provider, handlerProvider, linkProvider, tokenManagerProvider,
messageSerializer);
messageSerializer, false);

sink.next(connection);
});
Expand All @@ -1123,6 +1133,17 @@ private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer m
connectionOptions.getFullyQualifiedNamespace(), getEventHubName.get(), connectionOptions.getRetry()));
}

private ReactorConnectionCache<EventHubReactorAmqpConnection> buildConnectionCache(MessageSerializer messageSerializer, Meter meter) {
final ConnectionOptions connectionOptions = getConnectionOptions();
final Supplier<String> getEventHubName = () -> {
if (CoreUtils.isNullOrEmpty(eventHubName)) {
throw LOGGER.logExceptionAsError(new IllegalArgumentException("'eventHubName' cannot be an empty string."));
}
return eventHubName;
};
return v2StackSupport.createConnectionCache(connectionOptions, getEventHubName, messageSerializer, meter);
}

ConnectionOptions getConnectionOptions() {
Configuration buildConfiguration = configuration == null
? Configuration.getGlobalConfiguration().clone()
Expand Down
Loading

0 comments on commit 4e61bd6

Please sign in to comment.