From 4934e3906c0fb55fff18927c0cf7eb13c48ddfad Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Fri, 17 May 2024 11:45:29 +0530 Subject: [PATCH 01/13] Restrcuture message receiver to be non-blocking --- ballerina/receiver.bal | 2 +- .../asb/receiver/MessageReceiver.java | 444 +++++++++--------- .../ReceiverNetworkThreadFactory.java | 35 ++ 3 files changed, 263 insertions(+), 218 deletions(-) create mode 100644 native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java diff --git a/ballerina/receiver.bal b/ballerina/receiver.bal index 47ef7b88..ab748ac2 100644 --- a/ballerina/receiver.bal +++ b/ballerina/receiver.bal @@ -207,7 +207,7 @@ isolated function initializeReceiver(MessageReceiver receiverClient, handle conn 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; -isolated function receiveBatch(MessageReceiver endpointClient, int? maxMessageCount, int? serverWaitTime, boolean deadLettered) +isolated function receiveBatch(MessageReceiver endpointClient, int maxMessageCount, int? serverWaitTime, boolean deadLettered) returns MessageBatch|Error? = @java:Method { 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; diff --git a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index 90fa7e6b..38097cbd 100644 --- a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -26,6 +26,8 @@ import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import com.azure.messaging.servicebus.ServiceBusReceiverClient; import com.azure.messaging.servicebus.models.DeadLetterOptions; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.TypeCreator; @@ -49,9 +51,12 @@ import java.time.Duration; import java.util.HashMap; -import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static io.ballerina.runtime.api.creators.ValueCreator.createRecordValue; import static org.ballerinax.asb.util.ASBConstants.APPLICATION_PROPERTY_KEY; @@ -90,6 +95,8 @@ * Ballerina. */ public class MessageReceiver { + private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool( + new ReceiverNetworkThreadFactory()); private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiver.class); @@ -139,40 +146,35 @@ public static Object initializeReceiver(BObject receiverClient, String connectio * message. * @return Message Object of the received message. */ - public static Object receive(BObject receiverClient, Object serverWaitTime, - BTypedesc expectedType, Object deadLettered) { - try { - ServiceBusReceiverClient receiver; - if ((boolean) deadLettered) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); - } - IterableStream receivedMessages; - if (serverWaitTime != null) { - receivedMessages = receiver.receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)); - } else { - receivedMessages = receiver.receiveMessages(1); - } - - ServiceBusReceivedMessage receivedMessage = null; - for (ServiceBusReceivedMessage message : receivedMessages) { - receivedMessage = message; - } - if (receivedMessage == null) { - return null; + public static Object receive(Environment env, BObject receiverClient, Object serverWaitTime, BTypedesc expectedType, + boolean deadLettered) { + ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, deadLettered); + Future future = env.markAsync(); + EXECUTOR_SERVICE.submit(() -> { + try { + List messages = receiver + .receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)) + .stream().toList(); + if (messages.isEmpty()) { + future.complete(null); + return; + } + ServiceBusReceivedMessage message = messages.get(0); + RecordType expectedRecordType = ASBUtils.getRecordType(expectedType); + BMap bMsg = constructExpectedMessageRecord(message, expectedRecordType); + future.complete(bMsg); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - - LOGGER.debug("Received message with messageId: " + receivedMessage.getMessageId()); - RecordType expectedRecordType = ASBUtils.getRecordType(expectedType); - return constructExpectedMessageRecord(receiverClient, receivedMessage, expectedRecordType); - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -182,48 +184,46 @@ public static Object receive(BObject receiverClient, Object serverWaitTime, * @param serverWaitTime Specified server wait time in seconds to receive message * @return message payload */ - - public static Object receivePayload(BObject receiverClient, Object serverWaitTime, - BTypedesc expectedType, Object deadLettered) { - try { - ServiceBusReceiverClient receiver; - if ((boolean) deadLettered) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); - } - IterableStream receivedMessages; - if (serverWaitTime != null) { - receivedMessages = receiver.receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)); - } else { - receivedMessages = receiver.receiveMessages(1); - } - - ServiceBusReceivedMessage receivedMessage = null; - for (ServiceBusReceivedMessage message : receivedMessages) { - receivedMessage = message; - } - if (receivedMessage == null) { - return null; - } - - LOGGER.debug("Received message with messageId: " + receivedMessage.getMessageId()); - Object messageBody = getMessagePayload(receivedMessage); - if (messageBody instanceof byte[]) { - return getValueWithIntendedType((byte[]) messageBody, expectedType.getDescribingType()); - } else { - Optional bValue = convertJavaToBValue(receivedMessage.getMessageId(), messageBody); - return bValue.orElseGet(() -> - ErrorCreator.createError(StringUtils.fromString("Failed to bind the received ASB message " + - "value to the expected Ballerina type: '" + expectedType.toString() + "'"))); + public static Object receivePayload(Environment env, BObject receiverClient, Object serverWaitTime, + BTypedesc expectedType, boolean deadLettered) { + ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, deadLettered); + Future future = env.markAsync(); + EXECUTOR_SERVICE.submit(() -> { + try { + List messages = receiver + .receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)) + .stream().toList(); + if (messages.isEmpty()) { + future.complete(null); + return; + } + ServiceBusReceivedMessage message = messages.get(0); + + Object messageBody = getMessagePayload(message); + if (messageBody instanceof byte[] binaryPayload) { + Object messagePayload = getValueWithIntendedType(binaryPayload, expectedType.getDescribingType()); + future.complete(messagePayload); + } else { + Optional bValue = convertJavaToBValue(message.getMessageId(), messageBody); + String payloadBindingErr = String.format( + "Failed to bind the received ASB message value to the expected Ballerina type: '%s'", + expectedType.toString()); + Object messagePayload = bValue.orElseGet(() -> ErrorCreator.createError( + StringUtils.fromString(payloadBindingErr))); + future.complete(messagePayload); + } + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -237,29 +237,46 @@ public static Object receivePayload(BObject receiverClient, Object serverWaitTim * @param serverWaitTime Server wait time. * @return Batch Message Object of the received batch of messages. */ - public static Object receiveBatch(BObject receiverClient, Object maxMessageCount, Object serverWaitTime - , Object deadLettered) { - try { - ServiceBusReceiverClient receiver; - if ((boolean) deadLettered) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Waiting up to 'serverWaitTime' seconds for messages from " + receiver.getEntityPath()); + public static Object receiveBatch(Environment env, BObject receiverClient, long maxMessageCount, + Object serverWaitTime, boolean deadLettered) { + ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, deadLettered); + Future future = env.markAsync(); + EXECUTOR_SERVICE.submit(() -> { + try { + IterableStream receivedMessageStream; + if (Objects.isNull(serverWaitTime)) { + receivedMessageStream = receiver.receiveMessages((int) maxMessageCount); + } else { + receivedMessageStream = receiver.receiveMessages( + (int) maxMessageCount, Duration.ofSeconds((long) serverWaitTime)); + } + List> bMessages = receivedMessageStream.stream().map(msg -> { + BMap bMsg = constructExpectedMessageRecord(msg, null); + bMsg.addNativeData(NATIVE_MESSAGE, msg); + return bMsg; + }).toList(); + BMap messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(), + ASBConstants.MESSAGE_RECORD); + ArrayType sourceArrayType = TypeCreator.createArrayType(TypeUtils.getType(messageRecord)); + + Map value = new HashMap<>(); + value.put("messageCount", bMessages.size()); + value.put("messages", ValueCreator.createArrayValue(bMessages.toArray(new Object[0]), sourceArrayType)); + BMap bMsgBatch = createRecordValue( + ModuleUtils.getModule(), ASBConstants.MESSAGE_BATCH_RECORD, value); + future.complete(bMsgBatch); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - return getReceivedMessageBatch(receiverClient, maxMessageCount, serverWaitTime, deadLettered); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -269,26 +286,27 @@ public static Object receiveBatch(BObject receiverClient, Object maxMessageCount * @param message Message object. * @return An error if failed to complete the message. */ - public static Object complete(BObject receiverClient, BMap message) { - try { - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - ServiceBusReceiverClient receiver; - if (nativeMessage.getDeadLetterReason() != null) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); + public static Object complete(Environment env, BObject receiverClient, BMap message) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getReceiverFromBObject( + receiverClient, Objects.nonNull(nativeMessage.getDeadLetterReason())); + Future future = env.markAsync(); + EXECUTOR_SERVICE.submit(() -> { + try { + receiver.complete(nativeMessage); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - receiver.complete(nativeMessage); - LOGGER.debug("Completed the message(Id: " + nativeMessage.getMessageId() + ") with lockToken " + - nativeMessage.getLockToken()); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -298,21 +316,26 @@ public static Object complete(BObject receiverClient, BMap mess * @param message Message object. * @return An error if failed to abandon the message. */ - public static Object abandon(BObject receiverClient, BMap message) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - receiver.abandon(nativeMessage); - LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from %n%s", - nativeMessage.getMessageId(), receiver.getEntityPath())); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object abandon(Environment env, BObject receiverClient, BMap message) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.submit(() -> { + try { + receiver.abandon(nativeMessage); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** @@ -352,21 +375,26 @@ public static Object deadLetter(BObject receiverClient, BMap me * @param message Message object. * @return An error if failed to defer the message. */ - public static Object defer(BObject receiverClient, BMap message) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - receiver.defer(nativeMessage); - LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s", - nativeMessage.getMessageId(), receiver.getEntityPath())); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object defer(Environment env, BObject receiverClient, BMap message) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.submit(() -> { + try { + receiver.defer(nativeMessage); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** @@ -381,22 +409,30 @@ public static Object defer(BObject receiverClient, BMap message * its true identifier. * @return The received Message or null if there is no message for given sequence number. */ - public static Object receiveDeferred(BObject receiverClient, Object sequenceNumber) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage receivedMessage = receiver.receiveDeferredMessage((long) sequenceNumber); - if (receivedMessage == null) { - return null; + public static Object receiveDeferred(Environment env, BObject receiverClient, long sequenceNumber) { + ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.submit(() -> { + try { + ServiceBusReceivedMessage message = receiver.receiveDeferredMessage(sequenceNumber); + if (Objects.isNull(message)) { + future.complete(null); + return; + } + BMap bMsg = constructExpectedMessageRecord(message, null); + future.complete(bMsg); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - LOGGER.debug("Received deferred message using its sequenceNumber from " + receiver.getEntityPath()); - return constructExpectedMessageRecord(receiverClient, receivedMessage, null); - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -407,21 +443,26 @@ public static Object receiveDeferred(BObject receiverClient, Object sequenceNumb * @param message Message object. * @return An error if failed to renewLock of the message. */ - public static Object renewLock(BObject receiverClient, BMap message) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - receiver.renewMessageLock(nativeMessage); - LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s", - nativeMessage.getMessageId(), receiver.getEntityPath())); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object renewLock(Environment env, BObject receiverClient, BMap message) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.submit(() -> { + try { + receiver.renewMessageLock(nativeMessage); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** @@ -431,7 +472,7 @@ public static Object renewLock(BObject receiverClient, BMap mes */ public static Object closeReceiver(BObject receiverClient) { try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); + ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); receiver.close(); LOGGER.debug("Closed the receiver"); return null; @@ -448,12 +489,10 @@ public static Object closeReceiver(BObject receiverClient) { * Converts the received message to the contextually expected Ballerina record type (or to anydata, if not * specified). * - * @param receiverClient Ballerina client object * @param message Received Message */ - private static BMap constructExpectedMessageRecord(BObject receiverClient, - ServiceBusReceivedMessage message, - RecordType expectedType) { + private static BMap constructExpectedMessageRecord(ServiceBusReceivedMessage message, + RecordType expectedType) { Map map = populateOptionalFieldsMap(message); Object messageBody = getMessagePayload(message); if (messageBody instanceof byte[]) { @@ -530,42 +569,6 @@ private static Object getMessagePayload(ServiceBusReceivedMessage receivedMessag } } - private static BMap getReceivedMessageBatch(BObject receiverClient, Object maxMessageCount, - Object serverWaitTime, Object deadLettered) - throws InterruptedException, ServiceBusException { - ServiceBusReceiverClient receiver; - if ((boolean) deadLettered) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); - } - int maxCount = Long.valueOf(maxMessageCount.toString()).intValue(); - IterableStream receivedMessageStream; - if (serverWaitTime != null) { - receivedMessageStream = receiver.receiveMessages(maxCount, Duration.ofSeconds((long) serverWaitTime)); - } else { - receivedMessageStream = receiver.receiveMessages(maxCount); - } - - LinkedList receivedMessages = new LinkedList<>(); - for (ServiceBusReceivedMessage receivedMessage : receivedMessageStream) { - BMap recordMap = constructExpectedMessageRecord(receiverClient, receivedMessage, null); - BMap messageRecord = createRecordValue(ModuleUtils.getModule(), - ASBConstants.MESSAGE_RECORD, recordMap); - messageRecord.addNativeData(NATIVE_MESSAGE, receivedMessage); - receivedMessages.add(messageRecord); - } - - BMap messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(), - ASBConstants.MESSAGE_RECORD); - ArrayType sourceArrayType = TypeCreator.createArrayType(TypeUtils.getType(messageRecord)); - - Map map = new HashMap<>(); - map.put("messageCount", receivedMessages.size()); - map.put("messages", ValueCreator.createArrayValue(receivedMessages.toArray(new Object[0]), sourceArrayType)); - return createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_BATCH_RECORD, map); - } - private static BMap getApplicationProperties(ServiceBusReceivedMessage message) { BMap applicationPropertiesRecord = createRecordValue(ModuleUtils.getModule(), ASBConstants.APPLICATION_PROPERTY_TYPE); @@ -603,6 +606,13 @@ private static void populateApplicationProperty(BMap applicatio } } + private static ServiceBusReceiverClient getReceiverFromBObject(BObject bReceiver, boolean isDeadLetter) { + if (isDeadLetter) { + return (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(bReceiver); + } + return (ServiceBusReceiverClient) bReceiver.getNativeData(RECEIVER_CLIENT); + } + private static ServiceBusReceiverClient getReceiverFromBObject(BObject receiverObject) { return (ServiceBusReceiverClient) receiverObject.getNativeData(RECEIVER_CLIENT); } diff --git a/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java b/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java new file mode 100644 index 00000000..31dfc7b5 --- /dev/null +++ b/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KINDither express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinax.asb.receiver; + +import java.util.concurrent.ThreadFactory; + +/** + * A {@link ThreadFactory} object that creates new threads on demand for ASB message-receiver network actions. + */ +public class ReceiverNetworkThreadFactory implements ThreadFactory { + private final String threadGroupName = "receiver-network-thread"; + + @Override + public Thread newThread(Runnable runnable) { + Thread gitActionThread = new Thread(runnable); + gitActionThread.setName(threadGroupName); + return gitActionThread; + } +} From d074b4747cde2964581903586c379bbff70264fe Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Fri, 17 May 2024 11:49:12 +0530 Subject: [PATCH 02/13] Refactor code base --- .../asb/receiver/ReceiverNetworkThreadFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java b/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java index 31dfc7b5..a8e725a5 100644 --- a/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java +++ b/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java @@ -28,8 +28,8 @@ public class ReceiverNetworkThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable runnable) { - Thread gitActionThread = new Thread(runnable); - gitActionThread.setName(threadGroupName); - return gitActionThread; + Thread receiverThread = new Thread(runnable); + receiverThread.setName(threadGroupName); + return receiverThread; } } From 924eeda29636d080ce0c44fca968d134c9eec4b3 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Fri, 17 May 2024 18:56:27 +0530 Subject: [PATCH 03/13] Restructure receiver --- .../ballerinax/asb/receiver/MessageReceiver.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index 38097cbd..44d93454 100644 --- a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -150,7 +150,7 @@ public static Object receive(Environment env, BObject receiverClient, Object ser boolean deadLettered) { ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, deadLettered); Future future = env.markAsync(); - EXECUTOR_SERVICE.submit(() -> { + EXECUTOR_SERVICE.execute(() -> { try { List messages = receiver .receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)) @@ -188,7 +188,7 @@ public static Object receivePayload(Environment env, BObject receiverClient, Obj BTypedesc expectedType, boolean deadLettered) { ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, deadLettered); Future future = env.markAsync(); - EXECUTOR_SERVICE.submit(() -> { + EXECUTOR_SERVICE.execute(() -> { try { List messages = receiver .receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)) @@ -241,7 +241,7 @@ public static Object receiveBatch(Environment env, BObject receiverClient, long Object serverWaitTime, boolean deadLettered) { ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, deadLettered); Future future = env.markAsync(); - EXECUTOR_SERVICE.submit(() -> { + EXECUTOR_SERVICE.execute(() -> { try { IterableStream receivedMessageStream; if (Objects.isNull(serverWaitTime)) { @@ -291,7 +291,7 @@ public static Object complete(Environment env, BObject receiverClient, BMap { + EXECUTOR_SERVICE.execute(() -> { try { receiver.complete(nativeMessage); future.complete(null); @@ -320,7 +320,7 @@ public static Object abandon(Environment env, BObject receiverClient, BMap { + EXECUTOR_SERVICE.execute(() -> { try { receiver.abandon(nativeMessage); future.complete(null); @@ -379,7 +379,7 @@ public static Object defer(Environment env, BObject receiverClient, BMap { + EXECUTOR_SERVICE.execute(() -> { try { receiver.defer(nativeMessage); future.complete(null); @@ -412,7 +412,7 @@ public static Object defer(Environment env, BObject receiverClient, BMap { + EXECUTOR_SERVICE.execute(() -> { try { ServiceBusReceivedMessage message = receiver.receiveDeferredMessage(sequenceNumber); if (Objects.isNull(message)) { @@ -447,7 +447,7 @@ public static Object renewLock(Environment env, BObject receiverClient, BMap { + EXECUTOR_SERVICE.execute(() -> { try { receiver.renewMessageLock(nativeMessage); future.complete(null); From 804446d744c1ea33e2edf092036126fcf4f3faa7 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Mon, 20 May 2024 08:56:28 +0530 Subject: [PATCH 04/13] Add sender-thread-factory to the code-base --- .../sender/SenderNetworkThreadFactory.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java diff --git a/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java b/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java new file mode 100644 index 00000000..a937292a --- /dev/null +++ b/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KINDither express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinax.asb.sender; + +import java.util.concurrent.ThreadFactory; + +/** + * A {@link ThreadFactory} object that creates new threads on demand for ASB message-sender network actions. + */ +public class SenderNetworkThreadFactory implements ThreadFactory { + private final String threadGroupName = "sender-network-thread"; + + @Override + public Thread newThread(Runnable runnable) { + Thread senderThread = new Thread(runnable); + senderThread.setName(threadGroupName); + return senderThread; + } +} From ccaf4d238ec8e715f2a26dc29f843ff839d0bc00 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Fri, 31 May 2024 17:48:10 +0530 Subject: [PATCH 05/13] [Automated] Update the toml files --- ballerina/Ballerina.toml | 6 +++--- ballerina/Dependencies.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index abab5893..c2d95a58 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -2,7 +2,7 @@ distribution = "2201.8.0" org = "ballerinax" name = "asb" -version = "3.8.0" +version = "3.8.1-SNAPSHOT" license= ["Apache-2.0"] authors = ["Ballerina"] keywords = ["IT Operations/Message Brokers", "Cost/Paid", "Vendor/Microsoft"] @@ -19,5 +19,5 @@ graalvmCompatible = true groupId = "org.ballerinax" artifactId = "asb-native" module = "asb-native" -version = "3.8.0" -path = "../native/build/libs/asb-native-3.8.0.jar" +version = "3.8.1-SNAPSHOT" +path = "../native/build/libs/asb-native-3.8.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index ea02f1a4..a3c73353 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -127,7 +127,7 @@ modules = [ [[package]] org = "ballerinax" name = "asb" -version = "3.8.0" +version = "3.8.1-SNAPSHOT" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "lang.runtime"}, From a4b29917631c14a40da127e4c5a988464ef7c4a9 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Mon, 3 Jun 2024 17:59:10 +0530 Subject: [PATCH 06/13] Update receiver deadLetter and close functionalities to be reactive --- .../asb/receiver/MessageReceiver.java | 75 +++++++++++-------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index 52309347..7a518567 100644 --- a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -347,25 +347,30 @@ public static Object abandon(Environment env, BObject receiverClient, BMap message, Object deadLetterReason, - Object deadLetterErrorDescription) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - DeadLetterOptions options = new DeadLetterOptions() - .setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription)); - options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason)); - receiver.deadLetter(nativeMessage, options); - LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s", - nativeMessage.getMessageId(), receiver.getEntityPath())); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object deadLetter(Environment env, BObject receiverClient, BMap message, + Object deadLetterReason, Object deadLetterErrorDescription) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + DeadLetterOptions options = new DeadLetterOptions() + .setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription)); + options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason)); + receiver.deadLetter(nativeMessage, options); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** @@ -470,19 +475,25 @@ public static Object renewLock(Environment env, BObject receiverClient, BMap { + try { + receiver.close(); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** From 93eb6acb7b3e9a2cafbb57c0766420d8fb6e5029 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Mon, 3 Jun 2024 18:00:38 +0530 Subject: [PATCH 07/13] Fix failing test case --- ballerina/tests/asb_sender_receiver_negative_tests.bal | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ballerina/tests/asb_sender_receiver_negative_tests.bal b/ballerina/tests/asb_sender_receiver_negative_tests.bal index 4c05b850..bd22fb3b 100644 --- a/ballerina/tests/asb_sender_receiver_negative_tests.bal +++ b/ballerina/tests/asb_sender_receiver_negative_tests.bal @@ -190,8 +190,14 @@ function testSendToInvalidTopic() returns error? { function testReceiveFromInvalidQueue() returns error? { log:printInfo("[[testReceiveFromInvalidQueue]]"); log:printInfo("Creating Asb message receiver."); - receiverConfig.entityConfig = {queueName: "non-existing-queue"}; - MessageReceiver messageReceiver = check new (receiverConfig); + ASBServiceReceiverConfig invalidReceiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: "non-existing-queue" + }, + receiveMode: PEEK_LOCK + }; + MessageReceiver messageReceiver = check new (invalidReceiverConfig); log:printInfo("Sending payloads via ASB sender"); Message|error? e = messageReceiver->receive(5); From b6421d16143229a4fc1765ebdf055765520b2532 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Mon, 3 Jun 2024 18:05:42 +0530 Subject: [PATCH 08/13] Update change log --- changelog.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/changelog.md b/changelog.md index e9abcd02..95c1aa00 100644 --- a/changelog.md +++ b/changelog.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- [Implement ASB sender/receiver client actions in a non-blocking way](https://github.com/ballerina-platform/ballerina-library/issues/4982) + +## [3.8.0] - 2024-05-31 + ### Added - [Add the listener-service implementation of the Azure service-bus connector](https://github.com/ballerina-platform/ballerina-library/issues/6495) From 2a51dff348f9e9a675a47caaef8994eb190d39f4 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Mon, 3 Jun 2024 18:34:04 +0530 Subject: [PATCH 09/13] Restructure negative test cases --- .../asb_sender_receiver_negative_tests.bal | 48 +++++++++++++++---- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/ballerina/tests/asb_sender_receiver_negative_tests.bal b/ballerina/tests/asb_sender_receiver_negative_tests.bal index bd22fb3b..029a840e 100644 --- a/ballerina/tests/asb_sender_receiver_negative_tests.bal +++ b/ballerina/tests/asb_sender_receiver_negative_tests.bal @@ -31,7 +31,13 @@ function testReceivePayloadWithIncorrectExpectedType() returns error? { check messageSender->sendPayload(mapContent); log:printInfo("Creating Asb message receiver."); - receiverConfig.receiveMode = RECEIVE_AND_DELETE; + ASBServiceReceiverConfig receiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: testQueue1 + }, + receiveMode: RECEIVE_AND_DELETE + }; MessageReceiver messageReceiver = check new (receiverConfig); log:printInfo("Receiving from Asb receiver client."); @@ -61,7 +67,13 @@ function testInvalidComplete() returns error? { MessageSender messageSender = check new (senderConfig); log:printInfo("Initializing Asb receiver client."); - receiverConfig.receiveMode = RECEIVE_AND_DELETE; + ASBServiceReceiverConfig receiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: testQueue1 + }, + receiveMode: RECEIVE_AND_DELETE + }; MessageReceiver messageReceiver = check new (receiverConfig); @@ -102,7 +114,13 @@ function testInvalidAbandon() returns error? { MessageSender messageSender = check new (senderConfig); log:printInfo("Initializing Asb receiver client."); - receiverConfig.receiveMode = RECEIVE_AND_DELETE; + ASBServiceReceiverConfig receiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: testQueue1 + }, + receiveMode: RECEIVE_AND_DELETE + }; MessageReceiver messageReceiver = check new (receiverConfig); @@ -145,7 +163,13 @@ function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? { check messageSender->sendPayload(mapContent); log:printInfo("Creating Asb message receiver."); - receiverConfig.receiveMode = RECEIVE_AND_DELETE; + ASBServiceReceiverConfig receiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: testQueue1 + }, + receiveMode: RECEIVE_AND_DELETE + }; MessageReceiver messageReceiver = check new (receiverConfig); log:printInfo("Receiving from Asb receiver client."); @@ -170,8 +194,12 @@ function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? { function testSendToInvalidTopic() returns error? { log:printInfo("[[testSendToInvalidTopic]]"); log:printInfo("Creating Asb message sender."); - senderConfig.topicOrQueueName = "non-existing-topic"; - MessageSender messageSender = check new (senderConfig); + ASBServiceSenderConfig invalidSenderConfig = { + connectionString: connectionString, + entityType: QUEUE, + topicOrQueueName: "non-existing-topic" + }; + MessageSender messageSender = check new (invalidSenderConfig); log:printInfo("Sending payloads via ASB sender"); Error? e = messageSender->sendPayload("message"); @@ -216,8 +244,12 @@ function testReceiveFromInvalidQueue() returns error? { function testInvalidConnectionString() returns error? { log:printInfo("[[testInvalidConnectionString]]"); log:printInfo("Creating Asb message sender."); - senderConfig.connectionString = "invalid-connection-string"; - MessageSender|Error messageSender = new (senderConfig); + ASBServiceSenderConfig invalidSenderConfig = { + connectionString: "invalid-connection-string", + entityType: QUEUE, + topicOrQueueName: "testQueue1" + }; + MessageSender|Error messageSender = new (invalidSenderConfig); test:assertTrue(messageSender is error, msg = "Client creation should have failed."); test:assertEquals((messageSender).message(), "Error occurred while processing request: " + From be49eb945ed7b0d2dd90f3497c235e5f65b4188b Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Mon, 3 Jun 2024 18:58:08 +0530 Subject: [PATCH 10/13] Update ASB sender network actions to be reactive --- .../ballerinax/asb/sender/MessageSender.java | 214 ++++++++++-------- 1 file changed, 121 insertions(+), 93 deletions(-) diff --git a/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java b/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java index a404315f..131300fc 100644 --- a/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java +++ b/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java @@ -26,6 +26,8 @@ import com.azure.messaging.servicebus.ServiceBusMessageBatch; import com.azure.messaging.servicebus.ServiceBusSenderClient; import com.azure.messaging.servicebus.models.CreateMessageBatchOptions; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.TypeTags; import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.utils.StringUtils; @@ -49,6 +51,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.ballerinax.asb.util.ASBUtils.getRetryOptions; @@ -56,6 +60,8 @@ * This facilitates the client operations of MessageSender client in Ballerina. */ public class MessageSender { + private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool( + new SenderNetworkThreadFactory()); private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class); @@ -98,22 +104,26 @@ public static Object initializeSender(BObject senderClient, String connectionStr * @param message Input message record as a BMap * @return An error if failed to send the message */ - public static Object send(BObject senderClient, BMap message) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - ServiceBusMessage messageToSend = constructMessage(message); - sender.sendMessage(messageToSend); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Sent the message successfully. Message Id = " + messageToSend.getMessageId()); + public static Object send(Environment env, BObject senderClient, BMap message) { + ServiceBusSenderClient sender = getSenderFromBObject(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + ServiceBusMessage messageToSend = constructMessage(message); + sender.sendMessage(messageToSend); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -124,23 +134,27 @@ public static Object send(BObject senderClient, BMap message) { * @param scheduleTime Input schedule time record as a BMap * @return An error if failed to send the message */ - public static Object schedule(BObject senderClient, BMap message, + public static Object schedule(Environment env, BObject senderClient, BMap message, BMap scheduleTime) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - ServiceBusMessage messageToSend = constructMessage(message); - Long sequenceNumber = sender.scheduleMessage(messageToSend, constructOffset(scheduleTime)); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Scheduled the message successfully. Message Id = " + messageToSend.getMessageId()); + ServiceBusSenderClient sender = getSenderFromBObject(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + ServiceBusMessage messageToSend = constructMessage(message); + Long sequenceNumber = sender.scheduleMessage(messageToSend, constructOffset(scheduleTime)); + future.complete(sequenceNumber); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - return sequenceNumber; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -149,21 +163,25 @@ public static Object schedule(BObject senderClient, BMap messag * @param sequenceNumber The sequence number of the message to cance * @return An error if failed to send the message */ - public static Object cancel(BObject senderClient, long sequenceNumber) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - sender.cancelScheduledMessage(sequenceNumber); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Successfully cancelled scheduled message with sequenceNumber = " + sequenceNumber); + public static Object cancel(Environment env, BObject senderClient, long sequenceNumber) { + ServiceBusSenderClient sender = getSenderFromBObject(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + sender.cancelScheduledMessage(sequenceNumber); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -174,47 +192,51 @@ public static Object cancel(BObject senderClient, long sequenceNumber) { * @param messages Input batch message record as a BMap * @return An error if failed send the message. */ - public static Object sendBatch(BObject senderClient, BMap messages) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - Map messagesMap = ASBUtils.toObjectMap(messages); - BArray messageArray = (BArray) messagesMap.get("messages"); - Collection messageBatch = new ArrayList<>(); - for (int i = 0; i < messageArray.getLength(); i++) { - BMap messageBMap = (BMap) messageArray.get(i); - ServiceBusMessage asbMessage = constructMessage(messageBMap); - messageBatch.add(asbMessage); - } - ServiceBusMessageBatch currentBatch = sender.createMessageBatch(new CreateMessageBatchOptions()); - for (ServiceBusMessage message : messageBatch) { - if (currentBatch.tryAddMessage(message)) { - continue; + public static Object sendBatch(Environment env, BObject senderClient, BMap messages) { + ServiceBusSenderClient sender = getSenderFromBObject(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + Map messagesMap = ASBUtils.toObjectMap(messages); + BArray messageArray = (BArray) messagesMap.get("messages"); + Collection messageBatch = new ArrayList<>(); + for (int i = 0; i < messageArray.getLength(); i++) { + BMap messageBMap = (BMap) messageArray.get(i); + ServiceBusMessage asbMessage = constructMessage(messageBMap); + messageBatch.add(asbMessage); } - // The batch is full, so we create a new batch and send the batch. - sender.sendMessages(currentBatch); - currentBatch = sender.createMessageBatch(); + ServiceBusMessageBatch currentBatch = sender.createMessageBatch(new CreateMessageBatchOptions()); + for (ServiceBusMessage message : messageBatch) { + if (currentBatch.tryAddMessage(message)) { + continue; + } + // The batch is full, so we create a new batch and send the batch. + sender.sendMessages(currentBatch); + currentBatch = sender.createMessageBatch(); - // Add that message that we couldn't before. - if (!currentBatch.tryAddMessage(message)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Message is too large for an empty batch. Skipping. Max size: " - + currentBatch.getMaxSizeInBytes() + ". Message: " + - message.getBody().toString()); + // Add that message that we couldn't before. + if (!currentBatch.tryAddMessage(message)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Message is too large for an empty batch. Skipping. Max size: " + + currentBatch.getMaxSizeInBytes() + ". Message: " + + message.getBody().toString()); + } } } + sender.sendMessages(currentBatch); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - sender.sendMessages(currentBatch); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Sent the batch message successfully"); - } - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -222,19 +244,25 @@ public static Object sendBatch(BObject senderClient, BMap messa * * @return @return An error if failed close the sender. */ - public static Object close(BObject senderClient) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - sender.close(); - LOGGER.debug("Closed the sender. Identifier=" + sender.getIdentifier()); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object close(Environment env, BObject senderClient) { + ServiceBusSenderClient sender = getSenderFromBObject(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + sender.close(); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } private static ServiceBusMessage constructMessage(BMap message) { From 25632cc263a76d7219110ee4c4d0dfa24d1cfd76 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Fri, 7 Jun 2024 08:28:02 +0530 Subject: [PATCH 11/13] Rename thread group --- .../ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java | 2 +- .../org/ballerinax/asb/sender/SenderNetworkThreadFactory.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java b/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java index a8e725a5..e70e28e4 100644 --- a/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java +++ b/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java @@ -24,7 +24,7 @@ * A {@link ThreadFactory} object that creates new threads on demand for ASB message-receiver network actions. */ public class ReceiverNetworkThreadFactory implements ThreadFactory { - private final String threadGroupName = "receiver-network-thread"; + private final String threadGroupName = "asb-receiver-network-thread"; @Override public Thread newThread(Runnable runnable) { diff --git a/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java b/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java index a937292a..5a3a1b7f 100644 --- a/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java +++ b/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java @@ -24,7 +24,7 @@ * A {@link ThreadFactory} object that creates new threads on demand for ASB message-sender network actions. */ public class SenderNetworkThreadFactory implements ThreadFactory { - private final String threadGroupName = "sender-network-thread"; + private final String threadGroupName = "asb-sender-network-thread"; @Override public Thread newThread(Runnable runnable) { From f0e504c63ed0871f5de284d12e3d32680f3d135e Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Fri, 7 Jun 2024 08:30:59 +0530 Subject: [PATCH 12/13] Refactor receiver retrieval logic --- .../asb/receiver/MessageReceiver.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index 7a518567..d707fa92 100644 --- a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -148,7 +148,7 @@ public static Object initializeReceiver(BObject receiverClient, String connectio */ public static Object receive(Environment env, BObject receiverClient, Object serverWaitTime, BTypedesc expectedType, boolean deadLettered) { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, deadLettered); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, deadLettered); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -186,7 +186,7 @@ public static Object receive(Environment env, BObject receiverClient, Object ser */ public static Object receivePayload(Environment env, BObject receiverClient, Object serverWaitTime, BTypedesc expectedType, boolean deadLettered) { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, deadLettered); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, deadLettered); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -239,7 +239,7 @@ public static Object receivePayload(Environment env, BObject receiverClient, Obj */ public static Object receiveBatch(Environment env, BObject receiverClient, long maxMessageCount, Object serverWaitTime, boolean deadLettered) { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, deadLettered); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, deadLettered); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -288,7 +288,7 @@ public static Object receiveBatch(Environment env, BObject receiverClient, long */ public static Object complete(Environment env, BObject receiverClient, BMap message) { ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - ServiceBusReceiverClient receiver = getReceiverFromBObject( + ServiceBusReceiverClient receiver = getNativeReceiver( receiverClient, Objects.nonNull(nativeMessage.getDeadLetterReason())); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { @@ -318,7 +318,7 @@ public static Object complete(Environment env, BObject receiverClient, BMap message) { ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -350,7 +350,7 @@ public static Object abandon(Environment env, BObject receiverClient, BMap message, Object deadLetterReason, Object deadLetterErrorDescription) { ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -382,7 +382,7 @@ public static Object deadLetter(Environment env, BObject receiverClient, BMap message) { ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -415,7 +415,7 @@ public static Object defer(Environment env, BObject receiverClient, BMap { try { @@ -450,7 +450,7 @@ public static Object receiveDeferred(Environment env, BObject receiverClient, lo */ public static Object renewLock(Environment env, BObject receiverClient, BMap message) { ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient, false); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -476,7 +476,7 @@ public static Object renewLock(Environment env, BObject receiverClient, BMap { try { @@ -617,17 +617,13 @@ private static void populateApplicationProperty(BMap applicatio } } - private static ServiceBusReceiverClient getReceiverFromBObject(BObject bReceiver, boolean isDeadLetter) { + private static ServiceBusReceiverClient getNativeReceiver(BObject bReceiver, boolean isDeadLetter) { if (isDeadLetter) { return (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(bReceiver); } return (ServiceBusReceiverClient) bReceiver.getNativeData(RECEIVER_CLIENT); } - private static ServiceBusReceiverClient getReceiverFromBObject(BObject receiverObject) { - return (ServiceBusReceiverClient) receiverObject.getNativeData(RECEIVER_CLIENT); - } - private static Object getDeadLetterMessageReceiverFromBObject(BObject receiverObject) { if (receiverObject.getNativeData(ASBConstants.DEAD_LETTER_RECEIVER_CLIENT) != null) { return receiverObject.getNativeData(ASBConstants.DEAD_LETTER_RECEIVER_CLIENT); From 942089afbddbce7da423cfa28cedd709c3dd7a56 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Fri, 7 Jun 2024 08:33:22 +0530 Subject: [PATCH 13/13] Refactor sender retrieval logic --- .../org/ballerinax/asb/sender/MessageSender.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java b/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java index 131300fc..2dbe4163 100644 --- a/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java +++ b/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java @@ -105,7 +105,7 @@ public static Object initializeSender(BObject senderClient, String connectionStr * @return An error if failed to send the message */ public static Object send(Environment env, BObject senderClient, BMap message) { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); + ServiceBusSenderClient sender = getNativeSender(senderClient); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -136,7 +136,7 @@ public static Object send(Environment env, BObject senderClient, BMap message, BMap scheduleTime) { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); + ServiceBusSenderClient sender = getNativeSender(senderClient); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -164,7 +164,7 @@ public static Object schedule(Environment env, BObject senderClient, BMap { try { @@ -193,7 +193,7 @@ public static Object cancel(Environment env, BObject senderClient, long sequence * @return An error if failed send the message. */ public static Object sendBatch(Environment env, BObject senderClient, BMap messages) { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); + ServiceBusSenderClient sender = getNativeSender(senderClient); Future future = env.markAsync(); EXECUTOR_SERVICE.execute(() -> { try { @@ -245,7 +245,7 @@ public static Object sendBatch(Environment env, BObject senderClient, BMap { try { @@ -363,7 +363,7 @@ private static void setClient(BObject senderObject, ServiceBusSenderClient clien senderObject.addNativeData(ASBConstants.SENDER_CLIENT, client); } - private static ServiceBusSenderClient getSenderFromBObject(BObject senderObject) { + private static ServiceBusSenderClient getNativeSender(BObject senderObject) { return (ServiceBusSenderClient) senderObject.getNativeData(ASBConstants.SENDER_CLIENT); } }