Skip to content

Commit

Permalink
Allow copying value/sequence message (Azure#25048)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLove-msft authored Nov 1, 2021
1 parent b304a66 commit 363215c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,25 @@ public ServiceBusMessage(BinaryData body) : this(body?.ToMemory() ?? default)
public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage)
{
Argument.AssertNotNull(receivedMessage, nameof(receivedMessage));
if (!receivedMessage.AmqpMessage.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody))

AmqpMessageBody body = null;
if (receivedMessage.AmqpMessage.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody))
{
body = AmqpMessageBody.FromData(MessageBody.FromReadOnlyMemorySegments(dataBody));
}
else if (receivedMessage.AmqpMessage.Body.TryGetValue(out object valueBody))
{
body = AmqpMessageBody.FromValue(valueBody);
}
else if (receivedMessage.AmqpMessage.Body.TryGetSequence(out IEnumerable<IList<object>> sequenceBody))
{
body = AmqpMessageBody.FromSequence(sequenceBody);
}
else
{
throw new NotSupportedException($"{receivedMessage.AmqpMessage.Body.BodyType} is not a supported message body type.");
}

AmqpMessageBody body = new AmqpMessageBody(MessageBody.FromReadOnlyMemorySegments(dataBody));
AmqpMessage = new AmqpAnnotatedMessage(body);

// copy properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ public async Task CanSendValueSection(object value)
Assert.That(
() => received.Body,
Throws.InstanceOf<NotSupportedException>());

var sendable = new ServiceBusMessage(received);
sendable.GetRawAmqpMessage().Body.TryGetValue(out var sendData);
Assert.AreEqual(value, sendData);
}
}

Expand Down Expand Up @@ -433,6 +437,10 @@ public async Task CanSendSequenceSection(IEnumerable<IList<object>> sequence)
Assert.That(
() => received.Body,
Throws.InstanceOf<NotSupportedException>());

var sendable = new ServiceBusMessage(received);
sendable.GetRawAmqpMessage().Body.TryGetSequence(out var sendData);
Assert.AreEqual(sequence, sendData);
}
}

Expand Down

0 comments on commit 363215c

Please sign in to comment.