Skip to content

Commit

Permalink
ServiceBusSessionMessageActions management APIs (Azure#43245)
Browse files Browse the repository at this point in the history
* initial changes

* adding other methods

* removing rpc contract for sessionLockedUntil

* addressing comments

* trying to get pipeline to pass

* fixing unit tests

* removing extra statement

* changes for binary data

* adding binary data

* fixing unit test

* changes

* update changelog
  • Loading branch information
aishwaryabh authored Apr 25, 2024
1 parent 67fc9cd commit de7f4bb
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Update proto for support for `ServiceBusSessionMessageActions` and `RenewMessageLock` method.

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
Expand All @@ -19,6 +20,21 @@ service Settlement {

// Defers a message
rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}

// Renew message lock
rpc RenewMessageLock (RenewMessageLockRequest) returns (google.protobuf.Empty) {}

// Get session state
rpc GetSessionState (GetSessionStateRequest) returns (GetSessionStateResponse) {}

// Set session state
rpc SetSessionState (SetSessionStateRequest) returns (google.protobuf.Empty) {}

// Release session
rpc ReleaseSession (ReleaseSessionRequest) returns (google.protobuf.Empty) {}

// Renew session lock
rpc RenewSessionLock (RenewSessionLockRequest) returns (RenewSessionLockResponse) {}
}

// The complete message request containing the locktoken.
Expand All @@ -45,3 +61,39 @@ message DeferRequest {
string locktoken = 1;
bytes propertiesToModify = 2;
}

// The renew message lock request containing the locktoken.
message RenewMessageLockRequest {
string locktoken = 1;
}

// The get message request.
message GetSessionStateRequest {
string sessionId = 1;
}

// The set message request.
message SetSessionStateRequest {
string sessionId = 1;
bytes sessionState = 2;
}

// Get response containing the session state.
message GetSessionStateResponse {
bytes sessionState = 1;
}

// Release session.
message ReleaseSessionRequest {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLockRequest {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLockResponse {
google.protobuf.Timestamp lockedUntil = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,105 @@ await tuple.Actions.DeadLetterMessageAsync(
throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> RenewMessageLock(RenewMessageLockRequest request, ServerCallContext context)
{
try
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.RenewMessageLockAsync(
tuple.Message,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException(new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<GetSessionStateResponse> GetSessionState(GetSessionStateRequest request, ServerCallContext context)
{
try
{
if (_provider.SessionActionsCache.TryGetValue(request.SessionId, out var actions))
{
var sessionState = await actions.GetSessionStateAsync(context.CancellationToken).ConfigureAwait(false);
return new GetSessionStateResponse
{
SessionState = ByteString.CopyFrom(sessionState)
};
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException(new Status(StatusCode.FailedPrecondition, $"SessionId {request.SessionId} not found."));
}

public override async Task<Empty> SetSessionState(SetSessionStateRequest request, ServerCallContext context)
{
try
{
if (_provider.SessionActionsCache.TryGetValue(request.SessionId, out var actions))
{
await actions.SetSessionStateAsync(BinaryData.FromBytes(request.SessionState.ToByteArray()),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException(new Status(StatusCode.FailedPrecondition, $"SessionId {request.SessionId} not found."));
}

public override Task<Empty> ReleaseSession(ReleaseSessionRequest request, ServerCallContext context)
{
try
{
if (_provider.SessionActionsCache.TryGetValue(request.SessionId, out var actions))
{
actions.ReleaseSession();
return Task.FromResult(new Empty());
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException(new Status(StatusCode.FailedPrecondition, $"SessionId {request.SessionId} not found."));
}

public override async Task<RenewSessionLockResponse> RenewSessionLock(RenewSessionLockRequest request, ServerCallContext context)
{
try
{
if (_provider.SessionActionsCache.TryGetValue(request.SessionId, out var actions))
{
await actions.RenewSessionLockAsync().ConfigureAwait(false);
return new RenewSessionLockResponse
{
LockedUntil = actions.SessionLockedUntil.ToTimestamp()
};
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException(new Status(StatusCode.FailedPrecondition, $"SessionId {request.SessionId} not found."));
}

private static Dictionary<string, object> DeserializeAmqpMap(ByteString mapBytes)
{
if (mapBytes == null || mapBytes == ByteString.Empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ internal async Task ProcessSessionMessageAsync(ProcessSessionMessageEventArgs ar
{
var actions = new ServiceBusSessionMessageActions(args);
_messagingProvider.ActionsCache.TryAdd(args.Message.LockToken, (args.Message, actions));
if (_isSessionsEnabled)
{
_messagingProvider.SessionActionsCache.TryAdd(args.Message.SessionId, actions);
}

if (!await _sessionMessageProcessor.Value.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token)
.ConfigureAwait(false))
Expand Down Expand Up @@ -411,6 +415,10 @@ await _sessionMessageProcessor.Value.CompleteProcessingMessageAsync(actions, arg
{
receiveActions.EndExecutionScope();
_messagingProvider.ActionsCache.TryRemove(args.Message.LockToken, out _);
if (_isSessionsEnabled)
{
_messagingProvider.SessionActionsCache.TryRemove(args.Message.SessionId, out _);
}
}
}
}
Expand Down Expand Up @@ -496,6 +504,10 @@ private async Task RunBatchReceiveLoopAsync(CancellationTokenSource cancellation
foreach (var message in messages)
{
_messagingProvider.ActionsCache.TryAdd(message.LockToken, (message, messageActions));
if (_isSessionsEnabled)
{
_messagingProvider.SessionActionsCache.TryAdd(message.SessionId, (ServiceBusSessionMessageActions)messageActions);
}
}

var receiveActions = new ServiceBusReceiveActions(receiver);
Expand Down Expand Up @@ -720,6 +732,10 @@ private async Task TriggerAndCompleteMessagesInternal(ServiceBusReceivedMessage[
foreach (var message in input.Messages)
{
_messagingProvider.ActionsCache.TryRemove(message.LockToken, out _);
if (_isSessionsEnabled)
{
_messagingProvider.SessionActionsCache.TryRemove(message.SessionId, out _);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class MessagingProvider
internal ConcurrentDictionary<string, ServiceBusReceiver> MessageReceiverCache { get; } = new();
internal ConcurrentDictionary<string, ServiceBusClient> ClientCache { get; } = new();
internal ConcurrentDictionary<string, (ServiceBusReceivedMessage Message, ServiceBusMessageActions Actions)> ActionsCache { get; } = new();
internal ConcurrentDictionary<string, ServiceBusSessionMessageActions> SessionActionsCache { get; } = new();

/// <summary>
/// Initializes a new instance of <see cref="MessagingProvider"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,27 @@ public async Task BindToBatchAndAbandon()
Assert.IsEmpty(provider.ActionsCache);
}

[Test]
public async Task BindToMessageAndRenewMessageLock()
{
var host = BuildHost<ServiceBusBindToMessageAndRenewMessageLock>();
var settlementImpl = host.Services.GetRequiredService<SettlementService>();
var provider = host.Services.GetRequiredService<MessagingProvider>();
ServiceBusBindToMessageAndRenewMessageLock.SettlementService = settlementImpl;
await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);

using (host)
{
var message = new ServiceBusMessage("foobar");
var sender = client.CreateSender(FirstQueueScope.QueueName);
await sender.SendMessageAsync(message);

bool result = _waitHandle1.WaitOne(SBTimeoutMills);
Assert.True(result);
}
Assert.IsEmpty(provider.ActionsCache);
}

public class ServiceBusBindToMessageAndComplete
{
internal static SettlementService SettlementService { get; set; }
Expand Down Expand Up @@ -458,6 +479,21 @@ await SettlementService.Abandon(
_waitHandle1.Set();
}
}

public class ServiceBusBindToMessageAndRenewMessageLock
{
internal static SettlementService SettlementService { get; set; }
public static async Task BindToMessage(
[ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions)
{
Assert.AreEqual("foobar", message.Body.ToString());
var lockedBefore = message.LockedUntil;
await SettlementService.RenewMessageLock(
new RenewMessageLockRequest { Locktoken = message.LockToken },
new MockServerCallContext());
_waitHandle1.Set();
}
}
}
}
#endif
Loading

0 comments on commit de7f4bb

Please sign in to comment.