Skip to content

Commit

Permalink
Merge pull request #590 from Particular/fix-hanging-callbacks-3
Browse files Browse the repository at this point in the history
When actual response type does not match the expected type, callback tasks hang indefinitely
  • Loading branch information
tmasternak authored Jul 18, 2024
2 parents 9bf8a79 + 3c991a2 commit 60cbc7c
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
namespace NServiceBus.Callbacks.AcceptanceTests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using NUnit.Framework;

public class When_controlmessage_response_does_not_match_expected_type : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_fail_the_request_task()
{
Exception exception = null;
Task requestTask = null;

await Scenario.Define<Context>()
.WithEndpoint<Replier>()
.WithEndpoint<EndpointWithLocalCallback>(b => b.When(async (bus, ctx) =>
{
try
{
requestTask = bus.Request<MyResponse>(new MyRequest());

await requestTask;
}
catch (Exception e)
{
exception = e;
}
}).DoNotFailOnErrorMessages())
.Done(c => exception != null)
.Run();

Assert.AreEqual(TaskStatus.Faulted, requestTask.Status);
Assert.IsNotNull(exception);
Assert.AreEqual(typeof(ArgumentException), exception.GetType());
}

class Context : ScenarioContext
{
}

class Replier : EndpointConfigurationBuilder
{
public Replier()
{
EndpointSetup<DefaultServer>(c => c.EnableCallbacks(makesRequests: false));
}

public class MyRequestHandler : IHandleMessages<MyRequest>
{
public Task Handle(MyRequest message, IMessageHandlerContext context)
{
return context.Reply(42);
}
}
}

class EndpointWithLocalCallback : EndpointConfigurationBuilder
{
public EndpointWithLocalCallback()
{
EndpointSetup<DefaultServer>(c =>
{
c.MakeInstanceUniquelyAddressable("1");
c.EnableCallbacks();
c.ConfigureTransport().Routing().RouteToEndpoint(typeof(MyRequest), typeof(Replier));
});
}
}

public class MyRequest : IMessage
{
}

public class MyResponse : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
namespace NServiceBus.Callbacks.AcceptanceTests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using NUnit.Framework;

public class When_message_response_does_not_match_expected_type : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_fail_the_request_task()
{
Exception exception = null;
Task requestTask = null;

await Scenario.Define<Context>()
.WithEndpoint<Replier>()
.WithEndpoint<EndpointWithLocalCallback>(b => b.When(async (bus, ctx) =>
{
try
{
requestTask = bus.Request<MyResponse>(new MyRequest());

await requestTask;
}
catch (Exception e)
{
exception = e;
}
}).DoNotFailOnErrorMessages())
.Done(c => exception != null)
.Run();

Assert.AreEqual(TaskStatus.Faulted, requestTask.Status);
Assert.IsNotNull(exception);
Assert.AreEqual(typeof(InvalidCastException), exception.GetType());
}

class Context : ScenarioContext
{
}

class Replier : EndpointConfigurationBuilder
{
public Replier()
{
EndpointSetup<DefaultServer>();
}

public class MyRequestHandler : IHandleMessages<MyRequest>
{
public Task Handle(MyRequest message, IMessageHandlerContext context)
{
return context.Reply(new BadResponse());
}
}
}

class EndpointWithLocalCallback : EndpointConfigurationBuilder
{
public EndpointWithLocalCallback()
{
EndpointSetup<DefaultServer>(c =>
{
c.MakeInstanceUniquelyAddressable("1");
c.EnableCallbacks();
c.ConfigureTransport().Routing().RouteToEndpoint(typeof(MyRequest), typeof(Replier));
});
}
}

public class MyRequest : IMessage
{
}

public class MyResponse : IMessage
{
}

public class BadResponse : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ void AssignResultIfPossible(IncomingMessage incomingMessage, IIncomingContext co

var responseType = result.Value.TaskCompletionSource.ResponseType;
var errorCode = incomingMessage.Headers[Headers.ReturnMessageErrorCodeHeader];
result.Value.TaskCompletionSource.TrySetResult(errorCode.ConvertFromReturnCode(responseType));
try
{
result.Value.TaskCompletionSource.TrySetResult(errorCode.ConvertFromReturnCode(responseType));
}
catch (Exception e)
{
result.Value.TaskCompletionSource.TrySetException(e);
throw;
}
}

static bool IsControlMessage(IncomingMessage incomingMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ void AssignResultIfPossible(IncomingMessage incomingMessage, IIncomingLogicalMes
return;
}

result.Value.TaskCompletionSource.TrySetResult(context.Message.Instance);
context.MessageHandled = true;
try
{
result.Value.TaskCompletionSource.TrySetResult(context.Message.Instance);
context.MessageHandled = true;
}
catch (Exception e)
{
result.Value.TaskCompletionSource.TrySetException(e);
throw;
}
}

RequestResponseStateLookup requestResponseStateLookup;
Expand Down
9 changes: 9 additions & 0 deletions src/NServiceBus.Callbacks/TaskCompletionSourceAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ interface TaskCompletionSourceAdapter
void TrySetResult(object result);

void TrySetCanceled();

void TrySetException(Exception exception);
}

class TaskCompletionSourceAdapter<TResult> : TaskCompletionSourceAdapter
Expand Down Expand Up @@ -39,5 +41,12 @@ public void TrySetCanceled()
// Consider switching to TaskCreationOptions.RunContinuationsAsynchronously when updating the framework to 4.6. See https://blogs.msdn.microsoft.com/pfxteam/2015/02/02/new-task-apis-in-net-4-6/.
Task.Run(() => taskCompletionSource.TrySetCanceled()).Ignore();
}

public void TrySetException(Exception exception)
{
// prevent the continuation from blocking the pipeline by invoking it in parallel.
// Consider switching to TaskCreationOptions.RunContinuationsAsynchronously when updating the framework to 4.6. See https://blogs.msdn.microsoft.com/pfxteam/2015/02/02/new-task-apis-in-net-4-6/.
_ = Task.Run(() => taskCompletionSource.TrySetException(exception));
}
}
}

0 comments on commit 60cbc7c

Please sign in to comment.