Skip to content

Commit

Permalink
Merge pull request #220 from ayeshLK/listener-svc-impl
Browse files Browse the repository at this point in the history
Revamp `asb:Listener`/`asb:Service` based implementation
  • Loading branch information
ayeshLK authored May 20, 2024
2 parents bea4a03 + 7c4fc42 commit 4eae2fa
Show file tree
Hide file tree
Showing 26 changed files with 1,105 additions and 942 deletions.
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ modules = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.2.2"
version = "1.2.3"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down
78 changes: 21 additions & 57 deletions ballerina/caller.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,78 +16,42 @@

import ballerina/jballerina.java as java;

# Azure Service Bus Caller to perform functions on dispatched messages.
public class Caller {

private final LogLevel logLevel;

isolated function init(LogLevel logLevel) {
self.logLevel = logLevel;
}
# Represents a ASB caller, which can be used to mark messages as complete, abandon, deadLetter, or defer.
public isolated client class Caller {

# Complete message from queue or subscription based on messageLockToken. Declares the message processing to be
# successfully completed, removing the message from the queue.
#
# + message - Message record
#
# + return - An `asb:Error` if failed to complete message or else `()`
public isolated function complete(@display {label: "Message record"} Message message) returns Error? {
return completeMessage(self, message.lockToken, self.logLevel);
}
isolated remote function complete() returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeCaller"
} external;

# Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for
# the time being, returning the message immediately back to the queue to be picked up by another (or the same)
# receiver.
#
# + message - Message record
#
# + propertiesToModify - Message properties to modify
# + return - An `asb:Error` if failed to abandon message or else `()`
public isolated function abandon(@display {label: "Message record"} Message message) returns Error? {
return abandonMessage(self, message.lockToken, self.logLevel);
}
isolated remote function abandon(*record {|anydata...;|} propertiesToModify) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeCaller"
} external;

# Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer
# the message from the primary queue into a special "dead-letter sub-queue".
#
# + message - Message record
# + deadLetterReason - The deadletter reason.
# + deadLetterErrorDescription - The deadletter error description.
#
# + options - Options to specify while putting message in dead-letter queue
# + return - An `asb:Error` if failed to deadletter message or else `()`
public isolated function deadLetter(@display {label: "Message record"} Message message,
@display {label: "Dead letter reason (optional)"} string? deadLetterReason = (),
@display {label: "Dead letter description (optional)"}
string? deadLetterErrorDescription = ()) returns Error? {
return deadLetterMessage(self, message.lockToken, deadLetterReason,
deadLetterErrorDescription, self.logLevel);
}
isolated remote function deadLetter(*DeadLetterOptions options) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeCaller"
} external;

# Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being
# directly received from the queue by setting it aside such that it must be received by sequence number.
#
# + message - Message record
#
# + propertiesToModify - Message properties to modify
# + return - An `asb:Error` if failed to defer message or else sequence number
public isolated function defer(@display {label: "Message record"} Message message)
returns @display {label: "Sequence Number of the deferred message"} int|Error {
check deferMessage(self, message.lockToken, self.logLevel);
return <int>message.sequenceNumber;
}
isolated remote function defer(*record {|anydata...;|} propertiesToModify) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeCaller"
} external;
}

isolated function completeMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method {
name: "complete",
'class: "org.ballerinax.asb.listener.Caller"
} external;

isolated function abandonMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method {
name: "abandon",
'class: "org.ballerinax.asb.listener.Caller"
} external;

isolated function deadLetterMessage(Caller caller, string? lockToken, string? deadLetterReason,
string? deadLetterErrorDescription, string? logLevel) returns Error? = @java:Method {
name: "deadLetter",
'class: "org.ballerinax.asb.listener.Caller"
} external;

isolated function deferMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method {
name: "defer",
'class: "org.ballerinax.asb.listener.Caller"
} external;
3 changes: 3 additions & 0 deletions ballerina/errors.bal
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
# Defines the common error type for the module.
public type Error distinct error;

# Error type to capture the errors occurred while retrieving messages in Azure service bus listener.
public type MessageRetrievalError distinct Error & error<ErrorContext>;

isolated function createError(string|error|Error errorOrMessage) returns Error {
if errorOrMessage is Error {
// input is a ASB 'error' value
Expand Down
157 changes: 70 additions & 87 deletions ballerina/listener.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,98 +16,81 @@

import ballerina/jballerina.java as java;

# Ballerina Azure Service Bus Message Listener.
public class Listener {

private final string connectionString;
private final handle listenerHandle;
private final LogLevel logLevel;
Caller caller;

# Gets invoked to initialize the `listener`.
# The listener initialization requires setting the credentials.
# Create an [Azure account](https://azure.microsoft.com) and obtain tokens following [this guide](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quickstart-portal).
#
# + listenerConfig - The configurations to be used when initializing the `listener`
# + return - An error if listener initialization failed
public isolated function init(*ListenerConfig listenerConfig) returns error? {
self.connectionString = listenerConfig.connectionString;
self.logLevel = customConfiguration.logLevel;
self.listenerHandle = initListener(java:fromString(self.connectionString), java:fromString(self.logLevel));
self.caller = new Caller(self.logLevel);
externalInit(self.listenerHandle, self.caller);
# Represents a ASB consumer listener.
public isolated class Listener {

# Creates a new `asb:Listener`.
# ```ballerina
# listener asb:Listener asbListener = check new (
# connectionString = "xxxxxxxx",
# entityConfig = {
# queueName: "test-queue"
# },
# autoComplete = false
# );
# ```
#
# + config - ASB listener configurations
# + return - An `asb:Error` if an error is encountered or else '()'
public isolated function init(*ListenerConfiguration config) returns Error? {
return self.externInit(config);
}

# Starts consuming the messages on all the attached services.
#
# + return - `()` or else an error upon failure to start
public isolated function 'start() returns Error? {
return 'start(self.listenerHandle, self);
}

# Attaches the service to the `asb:Listener` endpoint.
#
# + s - Type descriptor of the service
# + name - Name of the service
# + return - `()` or else an error upon failure to register the service
public isolated function attach(MessageService s, string[]|string? name = ()) returns Error? {
return attach(self.listenerHandle, self, s);
}
private isolated function externInit(ListenerConfiguration config) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Stops consuming messages and detaches the service from the `asb:Listener` endpoint.
# Attaches an `asb:Service` to a listener.
# ```ballerina
# check asbListener.attach(asbService);
# ```
#
# + 'service - The service instance
# + name - Name of the service
# + return - An `asb:Error` if there is an error or else `()`
public isolated function attach(Service 'service, string[]|string? name = ()) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Detaches an `asb:Service` from the the listener.
# ```ballerina
# check asbListener.detach(asbService);
# ```
#
# + s - Type descriptor of the service
# + return - `()` or else an error upon failure to detach the service
public isolated function detach(MessageService s) returns Error? {
return detach(self.listenerHandle, self, s);
}

# Stops consuming messages through all consumer services by terminating the connection and all its channels.
# + 'service - The service to be detached
# + return - An `asb:Error` if there is an error or else `()`
public isolated function detach(Service 'service) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Starts the `asb:Listener`.
# ```ballerina
# check asbListener.'start();
# ```
#
# + return - `()` or else an error upon failure to close the `ChannelListener`
public isolated function gracefulStop() returns Error? {
return stop(self.listenerHandle, self);
}

# Stops consuming messages through all the consumer services and terminates the connection
# with the server.
# + return - An `asb:Error` if there is an error or else `()`
public isolated function 'start() returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Stops the `asb:Listener` gracefully.
# ```ballerina
# check asbListener.gracefulStop();
# ```
#
# + return - `()` or else an error upon failure to close the `ChannelListener`.
public isolated function immediateStop() returns Error? {
return forceStop(self.listenerHandle, self);
}
# + return - An `asb:Error` if there is an error or else `()`
public isolated function gracefulStop() returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Stops the `asb:Listener` immediately.
# ```ballerina
# check asbListener.immediateStop();
# ```
#
# + return - An `asb:Error` if there is an error or else `()`
public isolated function immediateStop() returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;
}

isolated function initListener(handle connectionString, handle logLevel)
returns handle = @java:Constructor {
'class: "org.ballerinax.asb.listener.MessageListener",
paramTypes: [
"java.lang.String",
"java.lang.String"
]
} external;

isolated function externalInit(handle listenerHandle, Caller caller) = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function 'start(handle listenerHandle, Listener lis) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function stop(handle listenerHandle, Listener lis) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function attach(handle listenerHandle, Listener lis, MessageService serviceType) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function detach(handle listenerHandle, Listener lis, MessageService serviceType) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function forceStop(handle listenerHandle, Listener lis) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

17 changes: 4 additions & 13 deletions ballerina/service_types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,9 @@
// specific language governing permissions and limitations
// under the License.

# Triggers when Choreo recieves a new message from Azure service bus. Available action: onMessage
public type MessageService service object {
# Triggers when a new message is received from Azure service bus
# + message - The Azure service bus message recieved
# + caller - The Azure service bus caller instance
# + return - Error on failure else nil()
isolated remote function onMessage(Message message, Caller caller) returns error?;
# The ASB service type.
public type Service distinct service object {
// isolated remote function onMessage(asb:Message message, asb:Caller caller) returns error?;

# Triggers when there is an error in message processing
#
# + context - Error message details
# + error - Ballerina error
# + return - Error on failure else nil()
isolated remote function onError(ErrorContext context, error 'error) returns error?;
// isolated remote function onError(asb:MessageRetrievalError 'error) returns error?;
};
Loading

0 comments on commit 4eae2fa

Please sign in to comment.