diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md index 98d0ba9996aa8..1d50bfb60ddfa 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- Update proto for support for `ServiceBusSessionMessageActions` and `RenewMessageLock` method. + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/settlement.proto b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/settlement.proto index 125fff115fd7d..48c07ed510d24 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/settlement.proto +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/settlement.proto @@ -2,6 +2,7 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; import "google/protobuf/wrappers.proto"; +import "google/protobuf/timestamp.proto"; // this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc"; @@ -19,6 +20,21 @@ service Settlement { // Defers a message rpc Defer (DeferRequest) returns (google.protobuf.Empty) {} + + // Renew message lock + rpc RenewMessageLock (RenewMessageLockRequest) returns (google.protobuf.Empty) {} + + // Get session state + rpc GetSessionState (GetSessionStateRequest) returns (GetSessionStateResponse) {} + + // Set session state + rpc SetSessionState (SetSessionStateRequest) returns (google.protobuf.Empty) {} + + // Release session + rpc ReleaseSession (ReleaseSessionRequest) returns (google.protobuf.Empty) {} + + // Renew session lock + rpc RenewSessionLock (RenewSessionLockRequest) returns (RenewSessionLockResponse) {} } // The complete message request containing the locktoken. @@ -45,3 +61,39 @@ message DeferRequest { string locktoken = 1; bytes propertiesToModify = 2; } + +// The renew message lock request containing the locktoken. +message RenewMessageLockRequest { + string locktoken = 1; +} + +// The get message request. +message GetSessionStateRequest { + string sessionId = 1; +} + +// The set message request. +message SetSessionStateRequest { + string sessionId = 1; + bytes sessionState = 2; +} + +// Get response containing the session state. +message GetSessionStateResponse { + bytes sessionState = 1; +} + +// Release session. +message ReleaseSessionRequest { + string sessionId = 1; +} + +// Renew session lock. +message RenewSessionLockRequest { + string sessionId = 1; +} + +// Renew session lock. +message RenewSessionLockResponse { + google.protobuf.Timestamp lockedUntil = 1; +} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/SettlementService.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/SettlementService.cs index 6ce4b9c2e4a25..985c6534a2dc6 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/SettlementService.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/SettlementService.cs @@ -127,6 +127,105 @@ await tuple.Actions.DeadLetterMessageAsync( throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found.")); } + public override async Task RenewMessageLock(RenewMessageLockRequest request, ServerCallContext context) + { + try + { + if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple)) + { + await tuple.Actions.RenewMessageLockAsync( + tuple.Message, + context.CancellationToken).ConfigureAwait(false); + return new Empty(); + } + } + catch (Exception ex) + { + throw new RpcException(new Status(StatusCode.Unknown, ex.ToString())); + } + + throw new RpcException(new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found.")); + } + + public override async Task GetSessionState(GetSessionStateRequest request, ServerCallContext context) + { + try + { + if (_provider.SessionActionsCache.TryGetValue(request.SessionId, out var actions)) + { + var sessionState = await actions.GetSessionStateAsync(context.CancellationToken).ConfigureAwait(false); + return new GetSessionStateResponse + { + SessionState = ByteString.CopyFrom(sessionState) + }; + } + } + catch (Exception ex) + { + throw new RpcException(new Status(StatusCode.Unknown, ex.ToString())); + } + + throw new RpcException(new Status(StatusCode.FailedPrecondition, $"SessionId {request.SessionId} not found.")); + } + + public override async Task SetSessionState(SetSessionStateRequest request, ServerCallContext context) + { + try + { + if (_provider.SessionActionsCache.TryGetValue(request.SessionId, out var actions)) + { + await actions.SetSessionStateAsync(BinaryData.FromBytes(request.SessionState.ToByteArray()), + context.CancellationToken).ConfigureAwait(false); + return new Empty(); + } + } + catch (Exception ex) + { + throw new RpcException(new Status(StatusCode.Unknown, ex.ToString())); + } + + throw new RpcException(new Status(StatusCode.FailedPrecondition, $"SessionId {request.SessionId} not found.")); + } + + public override Task ReleaseSession(ReleaseSessionRequest request, ServerCallContext context) + { + try + { + if (_provider.SessionActionsCache.TryGetValue(request.SessionId, out var actions)) + { + actions.ReleaseSession(); + return Task.FromResult(new Empty()); + } + } + catch (Exception ex) + { + throw new RpcException(new Status(StatusCode.Unknown, ex.ToString())); + } + + throw new RpcException(new Status(StatusCode.FailedPrecondition, $"SessionId {request.SessionId} not found.")); + } + + public override async Task RenewSessionLock(RenewSessionLockRequest request, ServerCallContext context) + { + try + { + if (_provider.SessionActionsCache.TryGetValue(request.SessionId, out var actions)) + { + await actions.RenewSessionLockAsync().ConfigureAwait(false); + return new RenewSessionLockResponse + { + LockedUntil = actions.SessionLockedUntil.ToTimestamp() + }; + } + } + catch (Exception ex) + { + throw new RpcException(new Status(StatusCode.Unknown, ex.ToString())); + } + + throw new RpcException(new Status(StatusCode.FailedPrecondition, $"SessionId {request.SessionId} not found.")); + } + private static Dictionary DeserializeAmqpMap(ByteString mapBytes) { if (mapBytes == null || mapBytes == ByteString.Empty) diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs index d5fdfcf886f2e..1a4a0d2d65329 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs @@ -384,6 +384,10 @@ internal async Task ProcessSessionMessageAsync(ProcessSessionMessageEventArgs ar { var actions = new ServiceBusSessionMessageActions(args); _messagingProvider.ActionsCache.TryAdd(args.Message.LockToken, (args.Message, actions)); + if (_isSessionsEnabled) + { + _messagingProvider.SessionActionsCache.TryAdd(args.Message.SessionId, actions); + } if (!await _sessionMessageProcessor.Value.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token) .ConfigureAwait(false)) @@ -411,6 +415,10 @@ await _sessionMessageProcessor.Value.CompleteProcessingMessageAsync(actions, arg { receiveActions.EndExecutionScope(); _messagingProvider.ActionsCache.TryRemove(args.Message.LockToken, out _); + if (_isSessionsEnabled) + { + _messagingProvider.SessionActionsCache.TryRemove(args.Message.SessionId, out _); + } } } } @@ -496,6 +504,10 @@ private async Task RunBatchReceiveLoopAsync(CancellationTokenSource cancellation foreach (var message in messages) { _messagingProvider.ActionsCache.TryAdd(message.LockToken, (message, messageActions)); + if (_isSessionsEnabled) + { + _messagingProvider.SessionActionsCache.TryAdd(message.SessionId, (ServiceBusSessionMessageActions)messageActions); + } } var receiveActions = new ServiceBusReceiveActions(receiver); @@ -720,6 +732,10 @@ private async Task TriggerAndCompleteMessagesInternal(ServiceBusReceivedMessage[ foreach (var message in input.Messages) { _messagingProvider.ActionsCache.TryRemove(message.LockToken, out _); + if (_isSessionsEnabled) + { + _messagingProvider.SessionActionsCache.TryRemove(message.SessionId, out _); + } } } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs index c3ecd8fe12d15..8287e1e8aaccb 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs @@ -24,6 +24,7 @@ public class MessagingProvider internal ConcurrentDictionary MessageReceiverCache { get; } = new(); internal ConcurrentDictionary ClientCache { get; } = new(); internal ConcurrentDictionary ActionsCache { get; } = new(); + internal ConcurrentDictionary SessionActionsCache { get; } = new(); /// /// Initializes a new instance of . diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcEndToEndTests.cs index 9f7175ce8132d..914ad0e4ad688 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcEndToEndTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcEndToEndTests.cs @@ -232,6 +232,27 @@ public async Task BindToBatchAndAbandon() Assert.IsEmpty(provider.ActionsCache); } + [Test] + public async Task BindToMessageAndRenewMessageLock() + { + var host = BuildHost(); + var settlementImpl = host.Services.GetRequiredService(); + var provider = host.Services.GetRequiredService(); + ServiceBusBindToMessageAndRenewMessageLock.SettlementService = settlementImpl; + await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString); + + using (host) + { + var message = new ServiceBusMessage("foobar"); + var sender = client.CreateSender(FirstQueueScope.QueueName); + await sender.SendMessageAsync(message); + + bool result = _waitHandle1.WaitOne(SBTimeoutMills); + Assert.True(result); + } + Assert.IsEmpty(provider.ActionsCache); + } + public class ServiceBusBindToMessageAndComplete { internal static SettlementService SettlementService { get; set; } @@ -458,6 +479,21 @@ await SettlementService.Abandon( _waitHandle1.Set(); } } + + public class ServiceBusBindToMessageAndRenewMessageLock + { + internal static SettlementService SettlementService { get; set; } + public static async Task BindToMessage( + [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions) + { + Assert.AreEqual("foobar", message.Body.ToString()); + var lockedBefore = message.LockedUntil; + await SettlementService.RenewMessageLock( + new RenewMessageLockRequest { Locktoken = message.LockToken }, + new MockServerCallContext()); + _waitHandle1.Set(); + } + } } } #endif \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcSessionEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcSessionEndToEndTests.cs index 3008f74fee790..6fba2da68024e 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcSessionEndToEndTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcSessionEndToEndTests.cs @@ -3,7 +3,9 @@ #if NET6_0_OR_GREATER using System; using System.Collections.Generic; +using System.IO.Hashing; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; @@ -12,11 +14,14 @@ using Grpc.Core; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Encoding; +using Microsoft.Azure.Management.ResourceManager.Models; using Microsoft.Azure.ServiceBus.Grpc; using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc; using Microsoft.Azure.WebJobs.ServiceBus; using Microsoft.Extensions.DependencyInjection; +using Newtonsoft.Json; using NUnit.Framework; +using static System.Net.Mime.MediaTypeNames; namespace Microsoft.Azure.WebJobs.Host.EndToEndTests { @@ -140,6 +145,88 @@ public async Task BindToSessionMessageAndAbandon() Assert.IsEmpty(provider.ActionsCache); } + [Test] + public async Task BindToSessionMessageAndSetAndGet() + { + var host = BuildHost(); + var settlementImpl = host.Services.GetRequiredService(); + var provider = host.Services.GetRequiredService(); + ServiceBusBindToSessionMessageAndSetAndGet.SettlementService = settlementImpl; + await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString); + + using (host) + { + var message = new ServiceBusMessage("foobar") { SessionId = "sessionId" }; + var sender = client.CreateSender(FirstQueueScope.QueueName); + await sender.SendMessageAsync(message); + + bool result = _waitHandle1.WaitOne(SBTimeoutMills); + Assert.True(result); + } + } + + [Test] + public async Task BindToSessionMessageAndSetAndGetBinaryData() + { + var host = BuildHost(); + var settlementImpl = host.Services.GetRequiredService(); + var provider = host.Services.GetRequiredService(); + ServiceBusBindToSessionMessageAndSetAndGetBinaryData.SettlementService = settlementImpl; + await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString); + + using (host) + { + byte[] predefinedData = { 0x48, 0x65 }; + var message = new ServiceBusMessage(BinaryData.FromBytes(predefinedData)) { SessionId = "sessionId" }; + var sender = client.CreateSender(FirstQueueScope.QueueName); + await sender.SendMessageAsync(message); + + bool result = _waitHandle1.WaitOne(SBTimeoutMills); + Assert.True(result); + } + } + + [Test] + public async Task BindToSessionMessageAndReleaseSession() + { + var host = BuildHost(); + var settlementImpl = host.Services.GetRequiredService(); + var provider = host.Services.GetRequiredService(); + ServiceBusBindToSessionMessageAndReleaseSession.SettlementService = settlementImpl; + await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString); + + using (host) + { + var message = new ServiceBusMessage("foobar") { SessionId = "sessionId" }; + var sender = client.CreateSender(FirstQueueScope.QueueName); + await sender.SendMessageAsync(message); + + bool result = _waitHandle1.WaitOne(SBTimeoutMills); + Assert.True(result); + } + Assert.IsEmpty(provider.SessionActionsCache); + } + + [Test] + public async Task BindToSessionMessageAndRenewSession() + { + var host = BuildHost(); + var settlementImpl = host.Services.GetRequiredService(); + var provider = host.Services.GetRequiredService(); + ServiceBusBindToSessionMessageAndRenewSessionLock.SettlementService = settlementImpl; + await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString); + + using (host) + { + var message = new ServiceBusMessage("foobar") { SessionId = "sessionId" }; + var sender = client.CreateSender(FirstQueueScope.QueueName); + await sender.SendMessageAsync(message); + + bool result = _waitHandle1.WaitOne(SBTimeoutMills); + Assert.True(result); + } + } + public class ServiceBusBindToSessionMessageAndComplete { internal static SettlementService SettlementService { get; set; } @@ -214,6 +301,119 @@ await SettlementService.Defer( } } + public class ServiceBusBindToSessionMessageAndRenewMessageLock + { + internal static SettlementService SettlementService { get; set; } + public static async Task BindToMessage( + [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions) + { + Assert.AreEqual("foobar", message.Body.ToString()); + await SettlementService.RenewMessageLock( + new RenewMessageLockRequest + { + Locktoken = message.LockToken, + }, + new MockServerCallContext()); + _waitHandle1.Set(); + } + } + + public class ServiceBusBindToSessionMessageAndSetAndGet + { + internal static SettlementService SettlementService { get; set; } + public static async Task BindToMessage( + [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions) + { + Assert.AreEqual("foobar", message.Body.ToString()); + await SettlementService.SetSessionState( + new SetSessionStateRequest + { + SessionId = message.SessionId, + SessionState = ByteString.CopyFromUtf8(message.Body.ToString()) + }, + new MockServerCallContext() + ); + var test = await SettlementService.GetSessionState( + new GetSessionStateRequest + { + SessionId = message.SessionId, + }, + new MockServerCallContext()); + Assert.IsNotEmpty(test.SessionState); + Assert.AreEqual("foobar", message.Body.ToString()); + _waitHandle1.Set(); + } + } + + public class ServiceBusBindToSessionMessageAndSetAndGetBinaryData + { + internal static SettlementService SettlementService { get; set; } + public static async Task BindToMessage( + [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions) + { + byte[] predefinedData = { 0x48, 0x65 }; + Assert.AreEqual(predefinedData, message.Body.ToArray()); + await SettlementService.SetSessionState( + new SetSessionStateRequest + { + SessionId = message.SessionId, + SessionState = ByteString.CopyFrom(predefinedData) + }, + new MockServerCallContext() + ); + var test = await SettlementService.GetSessionState( + new GetSessionStateRequest + { + SessionId = message.SessionId, + }, + new MockServerCallContext()); + Assert.IsNotEmpty(test.SessionState); + Assert.AreEqual(predefinedData, message.Body.ToArray()); + _waitHandle1.Set(); + } + } + + public class ServiceBusBindToSessionMessageAndReleaseSession + { + internal static SettlementService SettlementService { get; set; } + public static async Task BindToMessage( + [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions) + { + Assert.AreEqual("foobar", message.Body.ToString()); + await SettlementService.ReleaseSession( + new ReleaseSessionRequest + { + SessionId = message.SessionId + }, + new MockServerCallContext() + ); + Assert.AreEqual("foobar", message.Body.ToString()); + _waitHandle1.Set(); + } + } + + public class ServiceBusBindToSessionMessageAndRenewSessionLock + { + internal static SettlementService SettlementService { get; set; } + public static async Task BindToMessage( + [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions) + { + Assert.AreEqual("foobar", message.Body.ToString()); + // Check when the session lock is set to expire + var lockedUntil = message.LockedUntil; + + // Renew the session lock + await SettlementService.RenewSessionLock( + new RenewSessionLockRequest + { + SessionId = message.SessionId + }, + new MockServerCallContext() + ); + _waitHandle1.Set(); + } + } + public class ServiceBusBindToSessionMessageAndAbandon { internal static DateTime DateTimeNow { get; } = DateTime.UtcNow;