Skip to content

Commit 54d92a3

Browse files
authored
Simplify the logic reading session-id by making it final (#35822)
1 parent e1b23ad commit 54d92a3

File tree

3 files changed

+14
-22
lines changed

3 files changed

+14
-22
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,10 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
338338
return existing;
339339
}
340340

341-
return new ServiceBusSessionReceiver(link, messageSerializer, connectionProcessor.getRetryOptions(),
341+
final Duration idleTimeout = disposeOnIdle ? sessionIdleTimeout : null;
342+
return new ServiceBusSessionReceiver(sessionId, link, messageSerializer, connectionProcessor.getRetryOptions(),
342343
receiverOptions.getPrefetchCount(), scheduler, this::renewSessionLock,
343-
maxSessionLockRenewDuration, disposeOnIdle ? sessionIdleTimeout : null);
344+
maxSessionLockRenewDuration, idleTimeout);
344345
})))
345346
.flatMapMany(sessionReceiver -> sessionReceiver.receive().doFinally(signalType -> {
346347
LOGGER.atVerbose()

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
3131
import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY;
32-
import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
3332
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.MESSAGE_ID_LOGGING_KEY;
3433
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SESSION_ID_KEY;
3534

@@ -47,8 +46,8 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
4746
// also, the lock is removed after the completion of the message disposition.
4847
private final LockContainer<OffsetDateTime> lockContainer;
4948
private final AtomicReference<OffsetDateTime> sessionLockedUntil = new AtomicReference<>();
50-
private final AtomicReference<String> sessionId = new AtomicReference<>();
5149
private final AtomicReference<LockRenewalOperation> renewalOperation = new AtomicReference<>();
50+
private final String sessionId;
5251
private final ServiceBusReceiveLink receiveLink;
5352
private final Disposable.Composite subscriptions;
5453
private final Flux<ServiceBusMessageContext> receivedMessages;
@@ -61,6 +60,7 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
6160
* Creates a receiver for the first available session.
6261
*
6362
* @param receiveLink Service Bus receive link for available session.
63+
* @param sessionId Identifier of the Service Bus Session that the receiver is associated with.
6464
* @param messageSerializer Serializes and deserializes messages from Service Bus.
6565
* @param retryOptions Retry options for the receiver.
6666
* @param prefetch Number of messages to prefetch from session.
@@ -71,10 +71,11 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
7171
* @param sessionIdleTimeout Timeout after which session receiver will be disposed if there are no more messages
7272
* and the receiver is idle. Set it to {@code null} to not dispose receiver.
7373
*/
74-
ServiceBusSessionReceiver(ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer,
74+
ServiceBusSessionReceiver(String sessionId, ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer,
7575
AmqpRetryOptions retryOptions, int prefetch, Scheduler scheduler,
7676
Function<String, Mono<OffsetDateTime>> renewSessionLock, Duration maxSessionLockRenewDuration, Duration sessionIdleTimeout) {
7777

78+
this.sessionId = sessionId;
7879
this.receiveLink = receiveLink;
7980
this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT);
8081
this.retryOptions = retryOptions;
@@ -157,14 +158,6 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
157158
}));
158159
}
159160

160-
this.subscriptions.add(receiveLink.getSessionId().subscribe(id -> {
161-
if (!sessionId.compareAndSet(null, id)) {
162-
LOGGER.atWarning()
163-
.addKeyValue("existingSessionId", sessionId.get())
164-
.addKeyValue("returnedSessionId", id)
165-
.log("Another method set sessionId.");
166-
}
167-
}));
168161
this.subscriptions.add(receiveLink.getSessionLockedUntil().subscribe(lockedUntil -> {
169162
if (!sessionLockedUntil.compareAndSet(null, lockedUntil)) {
170163
withReceiveLinkInformation(LOGGER.atInfo())
@@ -174,7 +167,7 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
174167

175168
return;
176169
}
177-
this.renewalOperation.compareAndSet(null, new LockRenewalOperation(sessionId.get(),
170+
this.renewalOperation.compareAndSet(null, new LockRenewalOperation(sessionId,
178171
maxSessionLockRenewDuration, true, renewSessionLock, lockedUntil));
179172
}));
180173
}
@@ -204,7 +197,7 @@ String getLinkName() {
204197
}
205198

206199
String getSessionId() {
207-
return sessionId.get();
200+
return sessionId;
208201
}
209202

210203
/**
@@ -254,9 +247,7 @@ public void close() {
254247
}
255248

256249
private LoggingEventBuilder withReceiveLinkInformation(LoggingEventBuilder builder) {
257-
final String current = sessionId.get();
258-
259-
return builder.addKeyValue(SESSION_ID_KEY, current != null ? current : NOT_APPLICABLE)
250+
return builder.addKeyValue(SESSION_ID_KEY, sessionId)
260251
.addKeyValue(ENTITY_PATH_KEY, receiveLink.getEntityPath())
261252
.addKeyValue(LINK_NAME_KEY, receiveLink.getLinkName());
262253
}

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void getsProperties() {
104104
final Duration maxSessionRenewalDuration = Duration.ofMinutes(5);
105105

106106
// Act
107-
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink,
107+
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink,
108108
messageSerializer, retryOptions, 1, scheduler,
109109
unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT);
110110

@@ -152,7 +152,7 @@ public void receivesMessages() {
152152
final AmqpRetryOptions retryOptions = new AmqpRetryOptions();
153153
final Scheduler scheduler = Schedulers.boundedElastic();
154154
final Duration maxSessionRenewalDuration = Duration.ofMinutes(5);
155-
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink,
155+
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink,
156156
messageSerializer, retryOptions, 1, scheduler,
157157
unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT);
158158

@@ -222,7 +222,7 @@ public void disposesOnIdle() {
222222
final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofMinutes(10));
223223
final Scheduler scheduler = Schedulers.boundedElastic();
224224
final Duration maxSessionRenewalDuration = Duration.ofMinutes(5);
225-
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink,
225+
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink,
226226
messageSerializer, retryOptions, 1, scheduler,
227227
unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, waitTime);
228228

@@ -281,7 +281,7 @@ public void removesLockOnUpdateDisposition() {
281281
final AmqpRetryOptions retryOptions = new AmqpRetryOptions();
282282
final Scheduler scheduler = Schedulers.boundedElastic();
283283
final Duration maxSessionRenewalDuration = Duration.ofMinutes(5);
284-
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink,
284+
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink,
285285
messageSerializer, retryOptions, 1, scheduler,
286286
unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT);
287287

0 commit comments

Comments
 (0)