Skip to content

Commit

Permalink
Change Track 2 SB sendMessages() API for messages are being sent at a…
Browse files Browse the repository at this point in the history
… slow pace (#21014)

* [BUG] Track 2 SB API (Batch Send Messages) doesn't seem to work. Messages are being sent at a slow pace. #16127
  • Loading branch information
v-hongli1 authored Jun 9, 2021
1 parent 150f65b commit a9b28ec
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import com.azure.core.util.logging.ClientLogger;

import java.nio.BufferOverflowException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan;

Expand All @@ -30,7 +32,7 @@ public final class ServiceBusMessageBatch {
private final MessageSerializer serializer;
private final List<ServiceBusMessage> serviceBusMessageList;
private final byte[] eventBytes;
private int sizeInBytes;
private final AtomicInteger sizeInBytes;
private final TracerProvider tracerProvider;
private final String entityPath;
private final String hostname;
Expand All @@ -40,8 +42,8 @@ public final class ServiceBusMessageBatch {
this.maxMessageSize = maxMessageSize;
this.contextProvider = contextProvider;
this.serializer = serializer;
this.serviceBusMessageList = new LinkedList<>();
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
this.serviceBusMessageList = Collections.synchronizedList(new LinkedList<>());
this.sizeInBytes = new AtomicInteger((maxMessageSize / 65536) * 1024); // reserve 1KB for every 64KB
this.eventBytes = new byte[maxMessageSize];
this.tracerProvider = tracerProvider;
this.entityPath = entityPath;
Expand Down Expand Up @@ -72,7 +74,7 @@ public int getMaxSizeInBytes() {
* @return The size of the {@link ServiceBusMessageBatch batch} in bytes.
*/
public int getSizeInBytes() {
return this.sizeInBytes;
return this.sizeInBytes.get();
}

/**
Expand All @@ -97,9 +99,9 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
tracerProvider)
: serviceBusMessage;

final int size;
final AtomicInteger size = new AtomicInteger();
try {
size = getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty());
size.set(getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty()));
} catch (BufferOverflowException exception) {
final RuntimeException ex = new ServiceBusException(
new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
Expand All @@ -109,12 +111,9 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
throw logger.logExceptionAsWarning(ex);
}

synchronized (lock) {
if (this.sizeInBytes + size > this.maxMessageSize) {
return false;
}

this.sizeInBytes += size;
if (this.sizeInBytes.addAndGet(size.get()) > this.maxMessageSize) {
this.sizeInBytes.addAndGet(-1 * size.get());
return false;
}

this.serviceBusMessageList.add(serviceBusMessageUpdated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.StreamSupport;

import static com.azure.core.amqp.implementation.RetryUtil.getRetryPolicy;
import static com.azure.core.amqp.implementation.RetryUtil.withRetry;
Expand Down Expand Up @@ -574,7 +575,8 @@ private Mono<Void> sendIterable(Iterable<ServiceBusMessage> messages, ServiceBus
}

return createMessageBatch().flatMap(messageBatch -> {
messages.forEach(message -> messageBatch.tryAddMessage(message));
StreamSupport.stream(messages.spliterator(), true)
.forEach(message -> messageBatch.tryAddMessage(message));
return sendInternal(messageBatch, transaction);
});
}
Expand Down Expand Up @@ -635,32 +637,29 @@ private Mono<Void> sendInternal(ServiceBusMessageBatch batch, ServiceBusTransact

logger.info("Sending batch with size[{}].", batch.getCount());

Context sharedContext = null;
final List<org.apache.qpid.proton.message.Message> messages = new ArrayList<>();

for (int i = 0; i < batch.getMessages().size(); i++) {
final ServiceBusMessage event = batch.getMessages().get(i);
AtomicReference<Context> sharedContext = new AtomicReference<>(Context.NONE);
final List<org.apache.qpid.proton.message.Message> messages = Collections.synchronizedList(new ArrayList<>());
batch.getMessages().parallelStream().forEach(serviceBusMessage -> {
if (isTracingEnabled) {
parentContext.set(event.getContext());
if (i == 0) {
sharedContext = tracerProvider.getSharedSpanBuilder(SERVICE_BASE_NAME, parentContext.get());
parentContext.set(serviceBusMessage.getContext());
if (sharedContext.get().equals(Context.NONE)) {
sharedContext.set(tracerProvider.getSharedSpanBuilder(SERVICE_BASE_NAME, parentContext.get()));
}
tracerProvider.addSpanLinks(sharedContext.addData(SPAN_CONTEXT_KEY, event.getContext()));
tracerProvider.addSpanLinks(sharedContext.get().addData(SPAN_CONTEXT_KEY, serviceBusMessage.getContext()));
}
final org.apache.qpid.proton.message.Message message = messageSerializer.serialize(event);

final org.apache.qpid.proton.message.Message message = messageSerializer.serialize(serviceBusMessage);
final MessageAnnotations messageAnnotations = message.getMessageAnnotations() == null
? new MessageAnnotations(new HashMap<>())
: message.getMessageAnnotations();

message.setMessageAnnotations(messageAnnotations);
messages.add(message);
}
});

if (isTracingEnabled) {
final Context finalSharedContext = sharedContext == null
final Context finalSharedContext = sharedContext.get().equals(Context.NONE)
? Context.NONE
: sharedContext
: sharedContext.get()
.addData(ENTITY_PATH_KEY, entityName)
.addData(HOST_NAME_KEY, connectionProcessor.getFullyQualifiedNamespace())
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ public void testStartStopResume() throws InterruptedException {
*/
@Test
public void testErrorRecovery() throws InterruptedException {

List<ServiceBusMessageContext> messageList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
ServiceBusReceivedMessage serviceBusReceivedMessage =
Expand All @@ -204,6 +203,7 @@ public void testErrorRecovery() throws InterruptedException {
new ServiceBusMessageContext(serviceBusReceivedMessage);
messageList.add(serviceBusMessageContext);
}

final Flux<ServiceBusMessageContext> messageFlux = Flux.generate(() -> 0,
(state, sink) -> {
ServiceBusReceivedMessage serviceBusReceivedMessage =
Expand All @@ -220,11 +220,9 @@ public void testErrorRecovery() throws InterruptedException {
});

ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux);

AtomicInteger messageId = new AtomicInteger();
AtomicReference<CountDownLatch> countDownLatch = new AtomicReference<>();
countDownLatch.set(new CountDownLatch(4));

AtomicBoolean assertionFailed = new AtomicBoolean();
StringBuffer messageIdNotMatched = new StringBuffer();
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
Expand Down

1 comment on commit a9b28ec

@grzegorzkalisz
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

After upgrading lib from 7.0.1 to 7.3.0, I noticed that batch sending is reordering messages in Service Bus. I suppose this is due to the use of parallelStream() in the sendInternal method. Was it done on intentionally or by mistake?

Please sign in to comment.