Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQNET-637 NMS 2.0 #18

Merged
merged 4 commits into from
Jun 6, 2022
Merged

AMQNET-637 NMS 2.0 #18

merged 4 commits into from
Jun 6, 2022

Conversation

lukeabsent
Copy link
Contributor

NMS 2.0 async await, it can be further improved/made async,

not all features are implemented, for example shared consumers, delivery delay

@lukeabsent lukeabsent marked this pull request as ready for review December 30, 2021 15:10
TaskCompletionSource<Response> taskCompletionSource = new TaskCompletionSource<Response>(TaskCreationOptions.RunContinuationsAsynchronously);
if (timeout.TotalMilliseconds > 0)
{
CancellationTokenSource ct = new CancellationTokenSource(timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CancellationTokenSource is disposable. There is a resource leek here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but treid to update it anyway

taskCompletionSource.TrySetException(new RequestTimedOutException(timeout));
}, false);
}

FutureResponse future = AsyncRequest(command);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This FutureResponse is some Java variation around TaskCompletionSource, I'd drop it altogether. Is doesn't add any value, but complicates logic dramatically.

Response response = future.Response;
return response;

_ = future.Task.ContinueWith(t =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic you are applying here should be applied as part of OnCommand. You basically should resolve promise there.

{
if (t.IsCompleted)
{
taskCompletionSource.SetResult(t.Result);
Copy link
Contributor

@Havret Havret Jan 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use TrySet* instead of Set* on tcs. Set may throw an InvalidOperationException, where value is already set.

@@ -18,7 +18,9 @@
using System;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove most of the synchronisation blocks and replace Hashtable.Synchronized(new Hashtable()) with thread safe ConcurrentDictionary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it a try but without this locking, had some group of tests failing.

@@ -25,8 +26,13 @@ namespace Apache.NMS.ActiveMQ.Transport
/// <summary>
/// Handles asynchronous responses
/// </summary>
public class FutureResponse
public class FutureResponse : TaskCompletionSource<Response>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems you have 2 TCSs to handle a single request-reply. One should be enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seemed so : ), did a little refactoring, its bit more lean now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Havret, so once you have some time please look again at this pr

Copy link
Contributor

@Havret Havret left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd take a step back and rethink handling of async operations.

}
});

return taskCompletionSource.Task;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should handle scenario when the operation is cancelled and remove future or TCS from list of pending operations.


public static T GetAsyncResult<T>(this Task<T> task)
{
return task.Await().GetAwaiter().GetResult();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you're using GetResult there is no await so there is no point of calling ConfigureAwait.

@@ -44,7 +45,7 @@ internal AdvisoryConsumer(Connection connection, ConsumerId consumerId) : base()
this.info.NoLocal = true;

this.connection.AddDispatcher(consumerId, this);
this.connection.SyncRequest(this.info);
this.connection.SyncRequestAsync(this.info).GetAsyncResult();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From calling stack it seems this is used in async flow, but you're blocking it here. I guess this should be changed, so first, you asynchronously create a resource and then inject it here via the constructor.

@@ -66,7 +67,7 @@ internal void Dispose()
}
}

public void Dispatch(MessageDispatch messageDispatch)
public Task Dispatch_Async(MessageDispatch messageDispatch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this dash here. Why not call this method just DispatchAsync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i recall there was some naming collision

if (timeout.TotalMilliseconds > 0)
{
CancellationTokenSource ct = new CancellationTokenSource(timeout);
ct.Token.Register(() =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Register returns CancellationTokenRegistration that also should be disposed when you're dealt with this request.

public virtual void ClearBody()
public Task AcknowledgeAsync()
{
Acknowledge();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is blocking. Shouldn't be as the internal implementation isn't.

/// Goal of this is to replace lock(syncRoot) for sync and async methods, and also have Wait and Pulse(All) capabilities
/// Relies on AsyncLocal construct, and should be valid along the flow of executioncontext
/// </summary>
public class NmsSynchronizationMonitor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why you didn't use sth like https://www.nuget.org/packages/Nito.AsyncEx/ and reimplemented this from scratch yourself. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik this is non reentrant, non resursive, the other one tries to be

@@ -25,6 +26,6 @@ namespace Apache.NMS.ActiveMQ
/// </summary>
public interface IDispatcher
{
void Dispatch(MessageDispatch messageDispatch);
Task Dispatch_Async(MessageDispatch messageDispatch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not DispatchAsync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was collision with Session.DispatchAsync

namespace Apache.NMS.ActiveMQ
{
public interface ISynchronization
{
/// <summary>
/// Called before a commit or rollback is applied.
/// </summary>
void BeforeEnd();
Task BeforeEndAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope nobody was using this public API.

@@ -667,7 +693,8 @@ public void DeliverAcks()
this.executor = new ThreadPoolExecutor();
}

this.executor.QueueUserWorkItem(AsyncDeliverAck, ack);
// queue to sync pool so we have to make it sync method
this.executor.QueueUserWorkItem( (obj) => AsyncDeliverAckAsync(obj).GetAsyncResult(), ack);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's pretty strange, but I guess it makes sense.


latch.countDown();
}
set => SetResult(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not safe, it will throw an exception when you call SetCanceled or SetException first. I'd change it here and in other places to Try* equivalent.

Copy link
Contributor

@Havret Havret left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this PR as thoroughly as I could. I left some suggestions and remarks, but I don't know this code-base well enough to officially give you the green light.

@michaelandrepearce
Copy link
Contributor

@Havret i think you should be able to give green light, as long as all pre-existing tests pass and your comments are addressed around the new changes.

@lukeabsent
Copy link
Contributor Author

Hello @killnine perhaps you could also have a look ?

@lukeabsent
Copy link
Contributor Author

lukeabsent commented Apr 27, 2022

There was a little update to Task runners,

Here is my tests run on my local machine, except dtc, the same tests are failing on the pre-2.0/old version.
And as much as I can see the same situation was in here
#9 (comment)

Here's the report:
nms-openwire-test (27 tests) Failed: 27 tests failed
Apache.NMS.ActiveMQ.Test (27 tests) Failed: 27 tests failed
Commands (2 tests) Failed: 2 tests failed
ActiveMQMessageTest (net472) (1 test) Failed: One or more child tests had errors: 1 test failed
TestSetEmptyPropertyName Failed: Should have thrown exception
ActiveMQMessageTest (netcoreapp3.1) (1 test) Failed: One or more child tests had errors: 1 test failed
TestSetEmptyPropertyName Failed: Should have thrown exception
DtcBasicTransactionsTest (netcoreapp3.1) (2 tests) Failed: One or more child tests had errors: 2 tests failed
TestTransacteDequeueAndDbWrite Failed: wrong number of rows in DB
TestTransactedDBReadAndProduce Failed: wrong number of rows in DB
DtcConsumerTransactionsTest (netcoreapp3.1) (13 tests) Failed: One or more child tests had errors: 13 tests failed
TestConsumeWithDBInsertLogLocation Failed: Expected: 1
TestIterativeTransactedConsume Failed: wrong number of rows in DB
TestRecoverAfterFailOnTransactionAfterPrepareSent Failed: Expected: 0
TestRecoverAfterFailOnTransactionBeforePrepareSent Failed: Expected: 5
TestRecoverAfterRollbackFailWhenScopeAborted Failed: Expected: 5
TestRecoverAfterTransactionScopeAborted Failed: Expected: 5
TestRecoveryAfterCommitFailsAfterSent Failed: Expected: 0
TestRecoveryAfterCommitFailsBeforeSent Failed: Expected: 0
TestRedelivered Failed: System.PlatformNotSupportedException : This platform does not support distributed transactions.
TestRedeliveredCase2 Failed: System.PlatformNotSupportedException : This platform does not support distributed transactions.
TestRedeliveredCase3 Failed: System.PlatformNotSupportedException : This platform does not support distributed transactions.
TestRedeliveredNoComplete Failed: System.PlatformNotSupportedException : This platform does not support distributed transactions.
TestTransactedAsyncConsumption Failed: System.Transactions.TransactionAbortedException : The transaction has aborted.
DtcProducerTransactionsTest (netcoreapp3.1) (4 tests) Failed: One or more child tests had errors: 4 tests failed
TestIterativeTransactedProduceWithDBDelete Failed: wrong number of rows in DB
TestNoRecoverAfterFailOnTransactionWhenLogDeleted Failed: wrong number of rows in DB
TestRecoverAfterFailOnTransactionCommit Failed: wrong number of rows in DB
TestRecoverAfterFailOnTransactionPostCommitSend Failed: Expected: 5
MaxInactivityDurationTest (net472) (2 tests) Failed: One or more child tests had errors: 2 tests failed
TestInactivityMonitorThreadLeak (2 tests) Failed: One or more child tests had errors: 2 tests failed
TestInactivityMonitorThreadLeak(0) Failed: Handle count grew beyond maximum of 500 on iteration #25.
TestInactivityMonitorThreadLeak(1000) Failed: Handle count grew beyond maximum of 500 on iteration #19.
MaxInactivityDurationTest (netcoreapp3.1) (2 tests) Failed: One or more child tests had errors: 2 tests failed
TestInactivityMonitorThreadLeak (2 tests) Failed: One or more child tests had errors: 2 tests failed
TestInactivityMonitorThreadLeak(0) Failed: Handle count grew beyond maximum of 500 on iteration #44.
TestInactivityMonitorThreadLeak(1000) Failed: Handle count grew beyond maximum of 500 on iteration #26.
TempDestinationTest (net472) (1 test) Failed: One or more child tests had errors: 1 test failed
TestTmpQueueWorksUnderLoad Failed: Expected: <ActiveMQBytesMessage[ commandId = 0, responseRequired = False,
TempDestinationTest (netcoreapp3.1) (1 test) Failed: One or more child tests had errors: 1 test failed
TestTmpQueueWorksUnderLoad Failed: Expected: <ActiveMQBytesMessage[ commandId = 0, responseRequired = False,

When run in batch (all at once) sometimes these fail: InactivityMonitorTest.TestWriteMessageFail, AMQNET375Test.TestZeroPRefetchConsumerGetsAllMessages) but then after running separately they turn green.

@@ -28,6 +28,8 @@ public class CompositeTaskRunner : TaskRunner
{
private readonly Mutex mutex = new Mutex();
private readonly Thread theThread = null;
private AsyncLocal<bool> workExecutionContextCurrentlyProcessing = new AsyncLocal<bool>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind this flag? What is it supposed to do?

Copy link
Contributor

@Havret Havret left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's ok, if this helps the tests pass, but the reasoning behind this implementation exceeds my technical understanding of this library.

@michaelandrepearce
Copy link
Contributor

+1 from me, with same test coverage as previous. Main thing here is to note this will be first pass of 2.0 support, so its expected there maybe issues raised from end users, and we simply will have to bug fix those.

@Havret Havret merged commit fde8a9b into apache:main Jun 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants