Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQNET-637 NMS 2.0 #18

Merged
merged 4 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/AdvisoryConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/

using System;

using System.Threading.Tasks;
using Apache.NMS.ActiveMQ.Util;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Util.Synchronization;

namespace Apache.NMS.ActiveMQ
{
Expand All @@ -34,17 +35,23 @@ internal class AdvisoryConsumer : IDispatcher
private bool closed = false;
private int deliveredCounter = 0;

internal AdvisoryConsumer(Connection connection, ConsumerId consumerId) : base()
internal static async Task<AdvisoryConsumer> CreateAsync(Connection connection, ConsumerId consumerId)
{
var advisoryConsumer = new AdvisoryConsumer(connection, consumerId);
connection.AddDispatcher(consumerId, advisoryConsumer);
await connection.SyncRequestAsync(advisoryConsumer.info).Await();

return advisoryConsumer;
}

private AdvisoryConsumer(Connection connection, ConsumerId consumerId) : base()
{
this.connection = connection;
this.info = new ConsumerInfo();
this.info.ConsumerId = consumerId;
this.info.Destination = AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
this.info.PrefetchSize = 1000;
this.info.NoLocal = true;

this.connection.AddDispatcher(consumerId, this);
this.connection.SyncRequest(this.info);
}

internal void Dispose()
Expand All @@ -66,7 +73,7 @@ internal void Dispose()
}
}

public void Dispatch(MessageDispatch messageDispatch)
public Task Dispatch_Async(MessageDispatch messageDispatch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this dash here. Why not call this method just DispatchAsync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i recall there was some naming collision

{
// Auto ack messages when we reach 75% of the prefetch
deliveredCounter++;
Expand Down Expand Up @@ -101,6 +108,8 @@ public void Dispatch(MessageDispatch messageDispatch)
// This can happen across networks
Tracer.Debug("Unexpected message was dispatched to the AdvisoryConsumer: " + messageDispatch);
}

return Task.CompletedTask;
}

private void ProcessDestinationInfo(DestinationInfo destInfo)
Expand Down
12 changes: 11 additions & 1 deletion src/Commands/ActiveMQBytesMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,17 @@ private void StoreContent()
}
}

/// <summary>
public override bool IsBodyAssignableTo(Type type)
{
return Content != null && type.IsAssignableFrom(typeof(byte[]));
}

protected override T GetBody<T>()
{
return (T)(object)Content;
}

/// <summary>
/// Used when the message compression is enabled to track how many bytes
/// the EndianBinaryWriter actually writes to the stream before compression
/// so that the receiving client can read off the real bodylength from the
Expand Down
11 changes: 11 additions & 0 deletions src/Commands/ActiveMQMapMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

using System;
using System.IO;
using Apache.NMS.ActiveMQ.OpenWire;
using Apache.NMS.Util;
Expand Down Expand Up @@ -112,5 +113,15 @@ public override void BeforeMarshall(OpenWireFormat wireFormat)

base.BeforeMarshall(wireFormat);
}

public override bool IsBodyAssignableTo(Type type)
{
return Content != null && type.IsAssignableFrom(typeof(IPrimitiveMap));
}

protected override T GetBody<T>()
{
return (T)Body;
}
}
}
31 changes: 30 additions & 1 deletion src/Commands/ActiveMQMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

using System;
using System.Threading.Tasks;
using Apache.NMS.ActiveMQ.OpenWire;
using Apache.NMS.ActiveMQ.State;
using Apache.NMS.Util;
Expand Down Expand Up @@ -91,7 +92,13 @@ public void Acknowledge()
Acknowledger(this);
}

public virtual void ClearBody()
public Task AcknowledgeAsync()
{
Acknowledge();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is blocking. Shouldn't be as the internal implementation isn't.

return Task.CompletedTask;
}

public virtual void ClearBody()
{
this.ReadOnlyBody = false;
this.Content = null;
Expand All @@ -104,6 +111,26 @@ public virtual void ClearProperties()
this.Properties.Clear();
}

public T Body<T>()
{
if (IsBodyAssignableTo(typeof(T)))
{
return GetBody<T>();
}

throw new MessageFormatException("Message body cannot be read as type: " + typeof(T));
}

public virtual bool IsBodyAssignableTo(Type type)
{
return false;
}

protected virtual T GetBody<T>()
{
throw new NotImplementedException();
}

protected void FailIfReadOnlyBody()
{
if(ReadOnlyBody == true)
Expand Down Expand Up @@ -326,6 +353,8 @@ public string NMSType
set { Type = value; }
}

public DateTime NMSDeliveryTime { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } //ASYNC TODO

#endregion

#region NMS Extension headers
Expand Down
12 changes: 12 additions & 0 deletions src/Commands/ActiveMQObjectMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

using System;
using System.IO;

#if !(PocketPC||NETCF||NETCF_2_0)
Expand Down Expand Up @@ -94,6 +95,17 @@ public object Body
}
}

public override bool IsBodyAssignableTo(Type type)
{
return Content != null && type.IsInstanceOfType(Body);
}

protected override T GetBody<T>()
{
return (T)Body;
}


#if !(PocketPC||NETCF||NETCF_2_0)
public override void BeforeMarshall(OpenWireFormat wireFormat)
{
Expand Down
5 changes: 5 additions & 0 deletions src/Commands/ActiveMQStreamMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,11 @@ private void StoreContent()
this.byteBuffer = null;
}
}

public override bool IsBodyAssignableTo(Type type)
{
return false;
}
}
}

7 changes: 7 additions & 0 deletions src/Commands/ActiveMQTempDestination.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

using System;
using System.Threading.Tasks;

namespace Apache.NMS.ActiveMQ.Commands
{
Expand Down Expand Up @@ -85,6 +86,12 @@ public override Object Clone()
return o;
}

public Task DeleteAsync()
{
Delete();
return Task.CompletedTask;
}

public void Delete()
{
if(this.connection != null)
Expand Down
13 changes: 12 additions & 1 deletion src/Commands/ActiveMQTextMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,18 @@ public override int Size()
}

return base.Size();
}
}

public override bool IsBodyAssignableTo(Type type)
{
return Content != null && type.IsAssignableFrom(type);
}

protected override T GetBody<T>()
{
return (T)(object)Text;
}
}

}

Loading