-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMQNET-637 NMS 2.0 #18
Conversation
4c90a49
to
f2832bc
Compare
f2832bc
to
fafd30a
Compare
src/Transport/ResponseCorrelator.cs
Outdated
TaskCompletionSource<Response> taskCompletionSource = new TaskCompletionSource<Response>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
if (timeout.TotalMilliseconds > 0) | ||
{ | ||
CancellationTokenSource ct = new CancellationTokenSource(timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CancellationTokenSource
is disposable. There is a resource leek here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but treid to update it anyway
src/Transport/ResponseCorrelator.cs
Outdated
taskCompletionSource.TrySetException(new RequestTimedOutException(timeout)); | ||
}, false); | ||
} | ||
|
||
FutureResponse future = AsyncRequest(command); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This FutureResponse
is some Java variation around TaskCompletionSource
, I'd drop it altogether. Is doesn't add any value, but complicates logic dramatically.
src/Transport/ResponseCorrelator.cs
Outdated
Response response = future.Response; | ||
return response; | ||
|
||
_ = future.Task.ContinueWith(t => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic you are applying here should be applied as part of OnCommand
. You basically should resolve promise there.
src/Transport/ResponseCorrelator.cs
Outdated
{ | ||
if (t.IsCompleted) | ||
{ | ||
taskCompletionSource.SetResult(t.Result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use TrySet*
instead of Set*
on tcs. Set may throw an InvalidOperationException
, where value is already set.
@@ -18,7 +18,9 @@ | |||
using System; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd remove most of the synchronisation blocks and replace Hashtable.Synchronized(new Hashtable()) with thread safe ConcurrentDictionary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave it a try but without this locking, had some group of tests failing.
@@ -25,8 +26,13 @@ namespace Apache.NMS.ActiveMQ.Transport | |||
/// <summary> | |||
/// Handles asynchronous responses | |||
/// </summary> | |||
public class FutureResponse | |||
public class FutureResponse : TaskCompletionSource<Response> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it seems you have 2 TCS
s to handle a single request-reply. One should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seemed so : ), did a little refactoring, its bit more lean now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Havret, so once you have some time please look again at this pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd take a step back and rethink handling of async operations.
src/Transport/ResponseCorrelator.cs
Outdated
} | ||
}); | ||
|
||
return taskCompletionSource.Task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should handle scenario when the operation is cancelled and remove future or TCS from list of pending operations.
|
||
public static T GetAsyncResult<T>(this Task<T> task) | ||
{ | ||
return task.Await().GetAwaiter().GetResult(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you're using GetResult
there is no await
so there is no point of calling ConfigureAwait
.
src/AdvisoryConsumer.cs
Outdated
@@ -44,7 +45,7 @@ internal AdvisoryConsumer(Connection connection, ConsumerId consumerId) : base() | |||
this.info.NoLocal = true; | |||
|
|||
this.connection.AddDispatcher(consumerId, this); | |||
this.connection.SyncRequest(this.info); | |||
this.connection.SyncRequestAsync(this.info).GetAsyncResult(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From calling stack it seems this is used in async flow, but you're blocking it here. I guess this should be changed, so first, you asynchronously create a resource and then inject it here via the constructor.
@@ -66,7 +67,7 @@ internal void Dispose() | |||
} | |||
} | |||
|
|||
public void Dispatch(MessageDispatch messageDispatch) | |||
public Task Dispatch_Async(MessageDispatch messageDispatch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this dash here. Why not call this method just DispatchAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i recall there was some naming collision
src/Transport/ResponseCorrelator.cs
Outdated
if (timeout.TotalMilliseconds > 0) | ||
{ | ||
CancellationTokenSource ct = new CancellationTokenSource(timeout); | ||
ct.Token.Register(() => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Register returns CancellationTokenRegistration
that also should be disposed when you're dealt with this request.
public virtual void ClearBody() | ||
public Task AcknowledgeAsync() | ||
{ | ||
Acknowledge(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is blocking. Shouldn't be as the internal implementation isn't.
/// Goal of this is to replace lock(syncRoot) for sync and async methods, and also have Wait and Pulse(All) capabilities | ||
/// Relies on AsyncLocal construct, and should be valid along the flow of executioncontext | ||
/// </summary> | ||
public class NmsSynchronizationMonitor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why you didn't use sth like https://www.nuget.org/packages/Nito.AsyncEx/ and reimplemented this from scratch yourself. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afaik this is non reentrant, non resursive, the other one tries to be
@@ -25,6 +26,6 @@ namespace Apache.NMS.ActiveMQ | |||
/// </summary> | |||
public interface IDispatcher | |||
{ | |||
void Dispatch(MessageDispatch messageDispatch); | |||
Task Dispatch_Async(MessageDispatch messageDispatch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not DispatchAsync
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there was collision with Session.DispatchAsync
namespace Apache.NMS.ActiveMQ | ||
{ | ||
public interface ISynchronization | ||
{ | ||
/// <summary> | ||
/// Called before a commit or rollback is applied. | ||
/// </summary> | ||
void BeforeEnd(); | ||
Task BeforeEndAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope nobody was using this public API.
@@ -667,7 +693,8 @@ public void DeliverAcks() | |||
this.executor = new ThreadPoolExecutor(); | |||
} | |||
|
|||
this.executor.QueueUserWorkItem(AsyncDeliverAck, ack); | |||
// queue to sync pool so we have to make it sync method | |||
this.executor.QueueUserWorkItem( (obj) => AsyncDeliverAckAsync(obj).GetAsyncResult(), ack); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's pretty strange, but I guess it makes sense.
src/Transport/FutureResponse.cs
Outdated
|
||
latch.countDown(); | ||
} | ||
set => SetResult(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not safe, it will throw an exception when you call SetCanceled
or SetException
first. I'd change it here and in other places to Try* equivalent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed this PR as thoroughly as I could. I left some suggestions and remarks, but I don't know this code-base well enough to officially give you the green light.
@Havret i think you should be able to give green light, as long as all pre-existing tests pass and your comments are addressed around the new changes. |
Hello @killnine perhaps you could also have a look ? |
There was a little update to Task runners, Here is my tests run on my local machine, except dtc, the same tests are failing on the pre-2.0/old version. Here's the report: When run in batch (all at once) sometimes these fail: InactivityMonitorTest.TestWriteMessageFail, AMQNET375Test.TestZeroPRefetchConsumerGetsAllMessages) but then after running separately they turn green. |
@@ -28,6 +28,8 @@ public class CompositeTaskRunner : TaskRunner | |||
{ | |||
private readonly Mutex mutex = new Mutex(); | |||
private readonly Thread theThread = null; | |||
private AsyncLocal<bool> workExecutionContextCurrentlyProcessing = new AsyncLocal<bool>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reasoning behind this flag? What is it supposed to do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's ok, if this helps the tests pass, but the reasoning behind this implementation exceeds my technical understanding of this library.
+1 from me, with same test coverage as previous. Main thing here is to note this will be first pass of 2.0 support, so its expected there maybe issues raised from end users, and we simply will have to bug fix those. |
NMS 2.0 async await, it can be further improved/made async,
not all features are implemented, for example shared consumers, delivery delay