Skip to content

Commit

Permalink
Merge pull request #230 from ayeshLK/async-migration
Browse files Browse the repository at this point in the history
Update sender/receiver network calls to work reactively
  • Loading branch information
ayeshLK authored Jun 7, 2024
2 parents d4cb689 + 942089a commit aec6d84
Show file tree
Hide file tree
Showing 9 changed files with 518 additions and 359 deletions.
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
distribution = "2201.8.0"
org = "ballerinax"
name = "asb"
version = "3.8.0"
version = "3.8.1-SNAPSHOT"
license= ["Apache-2.0"]
authors = ["Ballerina"]
keywords = ["IT Operations/Message Brokers", "Cost/Paid", "Vendor/Microsoft"]
Expand All @@ -19,5 +19,5 @@ graalvmCompatible = true
groupId = "org.ballerinax"
artifactId = "asb-native"
module = "asb-native"
version = "3.8.0"
path = "../native/build/libs/asb-native-3.8.0.jar"
version = "3.8.1-SNAPSHOT"
path = "../native/build/libs/asb-native-3.8.1-SNAPSHOT.jar"
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "asb"
version = "3.8.0"
version = "3.8.1-SNAPSHOT"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.runtime"},
Expand Down
2 changes: 1 addition & 1 deletion ballerina/receiver.bal
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ isolated function initializeReceiver(MessageReceiver receiverClient, handle conn
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function receiveBatch(MessageReceiver endpointClient, int? maxMessageCount, int? serverWaitTime, boolean deadLettered)
isolated function receiveBatch(MessageReceiver endpointClient, int maxMessageCount, int? serverWaitTime, boolean deadLettered)
returns MessageBatch|Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;
Expand Down
58 changes: 48 additions & 10 deletions ballerina/tests/asb_sender_receiver_negative_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ function testReceivePayloadWithIncorrectExpectedType() returns error? {
check messageSender->sendPayload(mapContent);

log:printInfo("Creating Asb message receiver.");
receiverConfig.receiveMode = RECEIVE_AND_DELETE;
ASBServiceReceiverConfig receiverConfig = {
connectionString: connectionString,
entityConfig: {
queueName: testQueue1
},
receiveMode: RECEIVE_AND_DELETE
};
MessageReceiver messageReceiver = check new (receiverConfig);
log:printInfo("Receiving from Asb receiver client.");

Expand Down Expand Up @@ -61,7 +67,13 @@ function testInvalidComplete() returns error? {
MessageSender messageSender = check new (senderConfig);

log:printInfo("Initializing Asb receiver client.");
receiverConfig.receiveMode = RECEIVE_AND_DELETE;
ASBServiceReceiverConfig receiverConfig = {
connectionString: connectionString,
entityConfig: {
queueName: testQueue1
},
receiveMode: RECEIVE_AND_DELETE
};

MessageReceiver messageReceiver = check new (receiverConfig);

Expand Down Expand Up @@ -102,7 +114,13 @@ function testInvalidAbandon() returns error? {
MessageSender messageSender = check new (senderConfig);

log:printInfo("Initializing Asb receiver client.");
receiverConfig.receiveMode = RECEIVE_AND_DELETE;
ASBServiceReceiverConfig receiverConfig = {
connectionString: connectionString,
entityConfig: {
queueName: testQueue1
},
receiveMode: RECEIVE_AND_DELETE
};

MessageReceiver messageReceiver = check new (receiverConfig);

Expand Down Expand Up @@ -145,7 +163,13 @@ function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? {
check messageSender->sendPayload(mapContent);

log:printInfo("Creating Asb message receiver.");
receiverConfig.receiveMode = RECEIVE_AND_DELETE;
ASBServiceReceiverConfig receiverConfig = {
connectionString: connectionString,
entityConfig: {
queueName: testQueue1
},
receiveMode: RECEIVE_AND_DELETE
};
MessageReceiver messageReceiver = check new (receiverConfig);
log:printInfo("Receiving from Asb receiver client.");

Expand All @@ -170,8 +194,12 @@ function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? {
function testSendToInvalidTopic() returns error? {
log:printInfo("[[testSendToInvalidTopic]]");
log:printInfo("Creating Asb message sender.");
senderConfig.topicOrQueueName = "non-existing-topic";
MessageSender messageSender = check new (senderConfig);
ASBServiceSenderConfig invalidSenderConfig = {
connectionString: connectionString,
entityType: QUEUE,
topicOrQueueName: "non-existing-topic"
};
MessageSender messageSender = check new (invalidSenderConfig);

log:printInfo("Sending payloads via ASB sender");
Error? e = messageSender->sendPayload("message");
Expand All @@ -190,8 +218,14 @@ function testSendToInvalidTopic() returns error? {
function testReceiveFromInvalidQueue() returns error? {
log:printInfo("[[testReceiveFromInvalidQueue]]");
log:printInfo("Creating Asb message receiver.");
receiverConfig.entityConfig = {queueName: "non-existing-queue"};
MessageReceiver messageReceiver = check new (receiverConfig);
ASBServiceReceiverConfig invalidReceiverConfig = {
connectionString: connectionString,
entityConfig: {
queueName: "non-existing-queue"
},
receiveMode: PEEK_LOCK
};
MessageReceiver messageReceiver = check new (invalidReceiverConfig);

log:printInfo("Sending payloads via ASB sender");
Message|error? e = messageReceiver->receive(5);
Expand All @@ -210,8 +244,12 @@ function testReceiveFromInvalidQueue() returns error? {
function testInvalidConnectionString() returns error? {
log:printInfo("[[testInvalidConnectionString]]");
log:printInfo("Creating Asb message sender.");
senderConfig.connectionString = "invalid-connection-string";
MessageSender|Error messageSender = new (senderConfig);
ASBServiceSenderConfig invalidSenderConfig = {
connectionString: "invalid-connection-string",
entityType: QUEUE,
topicOrQueueName: "testQueue1"
};
MessageSender|Error messageSender = new (invalidSenderConfig);

test:assertTrue(messageSender is error, msg = "Client creation should have failed.");
test:assertEquals((<Error>messageSender).message(), "Error occurred while processing request: " +
Expand Down
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- [Implement ASB sender/receiver client actions in a non-blocking way](https://github.com/ballerina-platform/ballerina-library/issues/4982)

## [3.8.0] - 2024-05-31

### Added

- [Add the listener-service implementation of the Azure service-bus connector](https://github.com/ballerina-platform/ballerina-library/issues/6495)
Loading

0 comments on commit aec6d84

Please sign in to comment.