Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turning of batching for incoming amqp messages to gain faster feedback to the sender #6441

Merged
merged 4 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/EnvironmentVariables.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
|-------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------|-------------------------------|
| AmqpSettings__Enabled | Whether the AMQP protocol head should be enabled | bool | true |
| AmqpSettings__Port | The port for the AMQP protocol head to listen on | int32 | 5671 |
| AmqpSettings__DelayedBatchingEnabled | Enable to wait for subsequent packets to batch them, similar to Nagle for TCP | bool | false |
| AuthenticationMode | Determines who performs authentication | Scope, Cloud, CloudAndScope ([Cloud AuthenticationMode not supported in production](#cloudauthnote)) | Scope |
| BackupFolder | Path to place the backup EdgeHub database directory | string | TempPath of the current OS |
| CacheTokens | Whether client authentication tokens are saved to disk | bool | false |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public EventsLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IMetadataStore metadataStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore)
IMetadataStore metadataStore,
bool delayedBatchingEnabled)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore, delayedBatchingEnabled)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ public class LinkHandlerProvider : ILinkHandlerProvider
readonly IIdentityProvider identityProvider;
readonly IMetadataStore metadataStore;
readonly IDictionary<(UriPathTemplate Template, bool IsReceiver), LinkType> templatesList;
readonly bool delayedBatchingEnabled;

public LinkHandlerProvider(
IMessageConverter<AmqpMessage> messageConverter,
IMessageConverter<AmqpMessage> twinMessageConverter,
IMessageConverter<AmqpMessage> methodMessageConverter,
IIdentityProvider identityProvider,
IMetadataStore metadataStore)
: this(messageConverter, twinMessageConverter, methodMessageConverter, identityProvider, metadataStore, DefaultTemplatesList)
IMetadataStore metadataStore,
bool delayedBatchingEnabled)
: this(messageConverter, twinMessageConverter, methodMessageConverter, identityProvider, metadataStore, DefaultTemplatesList, delayedBatchingEnabled)
{
}

Expand All @@ -53,14 +55,16 @@ public LinkHandlerProvider(
IMessageConverter<AmqpMessage> methodMessageConverter,
IIdentityProvider identityProvider,
IMetadataStore metadataStore,
IDictionary<(UriPathTemplate Template, bool IsReceiver), LinkType> templatesList)
IDictionary<(UriPathTemplate Template, bool IsReceiver), LinkType> templatesList,
bool delayedBatchingEnabled)
{
this.messageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
this.twinMessageConverter = Preconditions.CheckNotNull(twinMessageConverter, nameof(twinMessageConverter));
this.methodMessageConverter = Preconditions.CheckNotNull(methodMessageConverter, nameof(methodMessageConverter));
this.identityProvider = Preconditions.CheckNotNull(identityProvider, nameof(identityProvider));
this.metadataStore = Preconditions.CheckNotNull(metadataStore, nameof(metadataStore));
this.templatesList = Preconditions.CheckNotNull(templatesList, nameof(templatesList));
this.delayedBatchingEnabled = delayedBatchingEnabled;
}

public ILinkHandler Create(IAmqpLink link, Uri uri)
Expand Down Expand Up @@ -89,7 +93,7 @@ internal ILinkHandler GetLinkHandler(LinkType linkType, IAmqpLink link, Uri uri,
return new DeviceBoundLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.metadataStore);

case LinkType.Events:
return new EventsLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.metadataStore);
return new EventsLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.metadataStore, this.delayedBatchingEnabled);

case LinkType.ModuleMessages:
return new ModuleMessageLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.metadataStore);
Expand All @@ -98,10 +102,10 @@ internal ILinkHandler GetLinkHandler(LinkType linkType, IAmqpLink link, Uri uri,
return new MethodSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter, this.metadataStore);

case LinkType.MethodReceiving:
return new MethodReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter, this.metadataStore);
return new MethodReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter, this.metadataStore, this.delayedBatchingEnabled);

case LinkType.TwinReceiving:
return new TwinReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter, this.metadataStore);
return new TwinReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter, this.metadataStore, this.delayedBatchingEnabled);

case LinkType.TwinSending:
return new TwinSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter, this.metadataStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ public MethodReceivingLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IMetadataStore metadataStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore)
IMetadataStore metadataStore,
bool delayedBatchingEnabled)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore, delayedBatchingEnabled)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Amqp.LinkHandlers
public abstract class ReceivingLinkHandler : LinkHandler, IReceivingLinkHandler
{
readonly ActionBlock<AmqpMessage> sendMessageProcessor;
readonly bool delayedBatchingEnabled;

protected ReceivingLinkHandler(
IIdentity identity,
Expand All @@ -26,12 +27,14 @@ protected ReceivingLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IMetadataStore metadataStore)
IMetadataStore metadataStore,
bool delayedBatchingEnabled)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore)
{
Preconditions.CheckArgument(link.IsReceiver, $"Link {requestUri} cannot receive");
this.ReceivingLink = link;
this.sendMessageProcessor = new ActionBlock<AmqpMessage>(this.ProcessMessageAsync);
this.delayedBatchingEnabled = delayedBatchingEnabled;
}

protected IReceivingAmqpLink ReceivingLink { get; }
Expand Down Expand Up @@ -85,12 +88,12 @@ internal async Task ProcessMessageAsync(AmqpMessage amqpMessage)
try
{
await this.OnMessageReceived(amqpMessage);
((IReceivingAmqpLink)this.Link).DisposeMessage(amqpMessage, AmqpConstants.AcceptedOutcome, true, true);
((IReceivingAmqpLink)this.Link).DisposeMessage(amqpMessage, AmqpConstants.AcceptedOutcome, true, this.delayedBatchingEnabled);
}
catch (Exception e) when (!e.IsFatal())
{
Events.ErrorProcessingMessage(e, this);
((IReceivingAmqpLink)this.Link).DisposeMessage(amqpMessage, AmqpConstants.RejectedOutcome, true, true);
((IReceivingAmqpLink)this.Link).DisposeMessage(amqpMessage, AmqpConstants.RejectedOutcome, true, this.delayedBatchingEnabled);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public TwinReceivingLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IMetadataStore metadataStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore)
IMetadataStore metadataStore,
bool delayedBatchingEnabled)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore, delayedBatchingEnabled)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ void RegisterAmqpModule(ContainerBuilder builder)
{
IConfiguration amqpSettings = this.configuration.GetSection("amqpSettings");
bool clientCertAuthEnabled = this.configuration.GetValue(Constants.ConfigKey.EdgeHubClientCertAuthEnabled, false);
builder.RegisterModule(new AmqpModule(amqpSettings["scheme"], amqpSettings.GetValue<ushort>("port"), this.serverCertificate, this.iotHubHostname, clientCertAuthEnabled, this.sslProtocols));
builder.RegisterModule(new AmqpModule(amqpSettings["scheme"], amqpSettings.GetValue<ushort>("port"), this.serverCertificate, this.iotHubHostname, clientCertAuthEnabled, this.sslProtocols, amqpSettings.GetValue<bool>("delayedBatchingEnabled")));
}

void RegisterMqttModule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@ public class AmqpModule : Module
readonly string iotHubHostName;
readonly bool clientCertAuthAllowed;
readonly SslProtocols sslProtocols;
readonly bool delayedBatchingEnabled;

public AmqpModule(
string scheme,
int port,
X509Certificate2 tlsCertificate,
string iotHubHostName,
bool clientCertAuthAllowed,
SslProtocols sslProtocols)
SslProtocols sslProtocols,
bool delayedBatchingEnabled)
{
this.scheme = Preconditions.CheckNonWhiteSpace(scheme, nameof(scheme));
this.port = Preconditions.CheckRange(port, 0, ushort.MaxValue, nameof(port));
this.tlsCertificate = Preconditions.CheckNotNull(tlsCertificate, nameof(tlsCertificate));
this.iotHubHostName = Preconditions.CheckNonWhiteSpace(iotHubHostName, nameof(iotHubHostName));
this.clientCertAuthAllowed = clientCertAuthAllowed;
this.sslProtocols = sslProtocols;
this.delayedBatchingEnabled = delayedBatchingEnabled;
}

protected override void Load(ContainerBuilder builder)
Expand Down Expand Up @@ -72,7 +75,7 @@ protected override void Load(ContainerBuilder builder)
IMessageConverter<AmqpMessage> directMethodMessageConverter = new AmqpDirectMethodMessageConverter();
var identityProvider = c.Resolve<IIdentityProvider>();
var metadataStore = await c.Resolve<Task<IMetadataStore>>();
ILinkHandlerProvider linkHandlerProvider = new LinkHandlerProvider(messageConverter, twinMessageConverter, directMethodMessageConverter, identityProvider, metadataStore);
ILinkHandlerProvider linkHandlerProvider = new LinkHandlerProvider(messageConverter, twinMessageConverter, directMethodMessageConverter, identityProvider, metadataStore, this.delayedBatchingEnabled);
return linkHandlerProvider;
})
.As<Task<ILinkHandlerProvider>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void CreateTest()
var metadataStore = Mock.Of<IMetadataStore>();

// Act
ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler.Object, messageConverter, metadataStore);
ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler.Object, messageConverter, metadataStore, false);

// Assert
Assert.NotNull(linkHandler);
Expand Down Expand Up @@ -85,7 +85,7 @@ public async Task SendMessageTest()
amqpMessage.Properties.ContentType = "application/json";
amqpMessage.Properties.ContentEncoding = "utf-8";

ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore);
ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore, false);

// Act
await linkHandler.OpenAsync(TimeSpan.FromSeconds(30));
Expand Down Expand Up @@ -167,7 +167,7 @@ public async Task SendMessageBatchTest()
using (AmqpMessage amqpMessage = GetBatchedMessage(contents))
{
amqpMessage.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore);
ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore, false);

// Act
await linkHandler.OpenAsync(TimeSpan.FromSeconds(30));
Expand Down Expand Up @@ -245,7 +245,7 @@ public async Task SendLargeMessageThrowsTest()
using (AmqpMessage amqpMessage = AmqpMessage.Create(new MemoryStream(new byte[800000]), false))
{
amqpMessage.ApplicationProperties.Map["LargeProp"] = new int[600000];
ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore);
ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore, false);

// Act
await linkHandler.OpenAsync(TimeSpan.FromSeconds(30));
Expand Down Expand Up @@ -336,7 +336,7 @@ public async Task SetProductInfoTest()
IEntityStore<string, string> store = storeProvider.GetEntityStore<string, string>("productInfo");
var metadataStore = new MetadataStore(store, edgeProductInfo);

ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore);
ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore, false);

// Act
await linkHandler.OpenAsync(TimeSpan.FromSeconds(1));
Expand Down Expand Up @@ -389,7 +389,7 @@ public async Task SetModelIdTest()
IEntityStore<string, string> store = storeProvider.GetEntityStore<string, string>("modelId");
var metadataStore = new MetadataStore(store, "testProductInfo");

ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore);
ILinkHandler linkHandler = new EventsLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, metadataStore, false);

// Act
await linkHandler.OpenAsync(TimeSpan.FromSeconds(1));
Expand Down
Loading