Skip to content

Commit

Permalink
feat: upgrade to MQTTnet 4.3.6.1152
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Jun 5, 2024
1 parent 9fb9af2 commit 910dbc9
Show file tree
Hide file tree
Showing 20 changed files with 107 additions and 64 deletions.
4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix>-beta.2</BaseVersionSuffix>
<BaseVersion>4.4.2$(BaseVersionSuffix)</BaseVersion>
<BaseVersionSuffix>-beta.1</BaseVersionSuffix>
<BaseVersion>4.5.0$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
</PropertyGroup>
Expand Down
7 changes: 7 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ uid: releases

# Releases

## [4.5.1](https://github.com/BEagle1984/silverback/releases/tag/v4.4.1)

### What's new

* Upgrade to [MQTTnet 4.3.6.1152](https://github.com/chkr1011/MQTTnet/releases/tag/v4.3.6.1152)
* Minor changes to reduce the likelihood of a deadlock

## [4.4.1](https://github.com/BEagle1984/silverback/releases/tag/v4.4.1)

### What's new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ namespace Silverback.Messaging.Messages
internal sealed class MessageStreamEnumerable<TMessage>
: IMessageStreamEnumerable<TMessage>, IMessageStreamEnumerable, IDisposable
{
[SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "False positive")]
private readonly SemaphoreSlim _writeSemaphore = new(1, 1);

[SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "False positive")]
private readonly SemaphoreSlim _readSemaphore = new(0, 1);

private readonly SemaphoreSlim _processedSemaphore = new(0, 1);
Expand Down Expand Up @@ -115,6 +117,7 @@ public void Dispose()
_abortCancellationTokenSource.Dispose();
}

[SuppressMessage("ReSharper", "SuspiciousLockOverSynchronizationPrimitive", Justification = "Intentional")]
private static void SafelyRelease(SemaphoreSlim semaphore)
{
lock (semaphore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private static string GetBrokerIdentifier(MqttClientOptions clientOptions) =>
clientOptions.ChannelOptions switch
{
MqttClientTcpOptions tcpOptions =>
$"{tcpOptions.Server.ToUpperInvariant()}-{tcpOptions.GetPort()}",
$"{tcpOptions.RemoteEndpoint}",
MqttClientWebSocketOptions socketOptions =>
socketOptions.Uri.ToUpperInvariant(),
_ => throw new InvalidOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.Collections.Generic;
using MQTTnet.Client;

#pragma warning disable CS0618 // Type or member is obsolete -> MQTTnet marked some properties as obsolete

namespace Silverback.Messaging.Configuration.Mqtt.Comparers
{
internal sealed class MqttClientTcpOptionsEqualityComparer : IEqualityComparer<MqttClientTcpOptions>
Expand All @@ -25,7 +27,8 @@ public bool Equals(MqttClientTcpOptions? x, MqttClientTcpOptions? y)
if (x.GetType() != y.GetType())
return false;

return x.Server == y.Server &&
return Equals(x.RemoteEndpoint, y.RemoteEndpoint) &&
x.Server == y.Server &&
x.Port == y.Port &&
x.BufferSize == y.BufferSize &&
x.DualMode == y.DualMode &&
Expand All @@ -35,6 +38,7 @@ public bool Equals(MqttClientTcpOptions? x, MqttClientTcpOptions? y)
}

public int GetHashCode(MqttClientTcpOptions obj) => HashCode.Combine(
obj.RemoteEndpoint,
obj.Server,
obj.Port,
obj.BufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics.CodeAnalysis;
using System.Net.Sockets;
using MQTTnet.Client;
using MQTTnet.Formatter;

Expand Down Expand Up @@ -314,10 +315,13 @@ IMqttClientConfigBuilder UseProxy(
/// <param name="port">
/// The server port. If not specified the default port 1883 will be used.
/// </param>
/// <param name="addressFamily">
/// The address family to use for the connection. The default is <see cref="AddressFamily.Unspecified" />.
/// </param>
/// <returns>
/// The <see cref="IMqttClientConfigBuilder" /> so that additional calls can be chained.
/// </returns>
IMqttClientConfigBuilder ConnectViaTcp(string server, int? port = null);
IMqttClientConfigBuilder ConnectViaTcp(string server, int? port = null, AddressFamily addressFamily = AddressFamily.Unspecified);

/// <summary>
/// Specifies the TCP connection settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,10 @@ public void Validate()

if (ChannelOptions is MqttClientTcpOptions tcpOptions)
{
if (string.IsNullOrEmpty(tcpOptions.Server))
throw new EndpointConfigurationException("ChannelOptions.Server cannot be empty.");
#pragma warning disable CS0618 // Type or member is obsolete
if (tcpOptions.RemoteEndpoint == null && string.IsNullOrEmpty(tcpOptions.Server))
#pragma warning restore CS0618 // Type or member is obsolete
throw new EndpointConfigurationException("ChannelOptions.RemoteEndpoint cannot be empty.");
}
else if (ChannelOptions is MqttClientWebSocketOptions webSocketOptions)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@

using System;
using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Net.Sockets;
using System.Security;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using Silverback.Util;

namespace Silverback.Messaging.Configuration.Mqtt
Expand Down Expand Up @@ -59,8 +63,11 @@ public MqttClientConfigBuilder(MqttClientConfig baseConfig, IServiceProvider? se
ConnectViaTcp(
options =>
{
options.RemoteEndpoint = tcpOptions.RemoteEndpoint;
#pragma warning disable CS0618 // Type or member is obsolete
options.Port = tcpOptions.Port;
options.Server = tcpOptions.Server;
#pragma warning restore CS0618 // Type or member is obsolete
options.AddressFamily = tcpOptions.AddressFamily;
options.BufferSize = tcpOptions.BufferSize;
options.DualMode = tcpOptions.DualMode;
Expand Down Expand Up @@ -338,12 +345,12 @@ public IMqttClientConfigBuilder WithSessionExpiration(TimeSpan sessionExpiryInte
return this;
}

/// <inheritdoc cref="IMqttClientConfigBuilder.ConnectViaTcp(string,int?)" />
public IMqttClientConfigBuilder ConnectViaTcp(string server, int? port = null)
/// <inheritdoc cref="IMqttClientConfigBuilder.ConnectViaTcp(string,int?,AddressFamily)" />
public IMqttClientConfigBuilder ConnectViaTcp(string server, int? port = null, AddressFamily addressFamily = AddressFamily.Unspecified)
{
Check.NotNull(server, nameof(server));

_builder.WithTcpServer(server, port);
_builder.WithTcpServer(server, port, addressFamily);
return this;
}

Expand All @@ -353,6 +360,16 @@ public IMqttClientConfigBuilder ConnectViaTcp(Action<MqttClientTcpOptions> optio
Check.NotNull(optionsAction, nameof(optionsAction));

_builder.WithTcpServer(optionsAction);

// MQTTnet is not properly setting the remote endpoint when using this method
var options = new MqttClientTcpOptions();
optionsAction.Invoke(options);

if (options.RemoteEndpoint is DnsEndPoint dnsEndPoint)
ConnectViaTcp(dnsEndPoint.Host, dnsEndPoint.Port, options.AddressFamily);
else if (options.RemoteEndpoint is IPEndPoint ipEndPoint)
ConnectViaTcp(ipEndPoint.Address.ToString(), ipEndPoint.Port, options.AddressFamily);

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ This package contains an implementation of Silverback.Integration for MQTT.</Des
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MQTTnet" Version="4.3.2.930" />
<PackageReference Include="MQTTnet" Version="4.3.6.1152" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -37,7 +38,7 @@ public class MqttLoggerExtensionsTests
{
ChannelOptions = new MqttClientTcpOptions
{
Server = "test-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
};
Expand Down Expand Up @@ -96,15 +97,14 @@ public void LogConnectError_Logged()
ClientId = "test-client",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt",
Port = 1234
RemoteEndpoint = new DnsEndPoint("mqtt", 1234)
}
},
Substitute.For<IBrokerCallbacksInvoker>(),
_silverbackLogger);

var expectedMessage =
"Error occurred connecting to the MQTT broker. | clientId: test-client, broker: mqtt:1234";
"Error occurred connecting to the MQTT broker. | clientId: test-client, broker: Unspecified/mqtt:1234";

_silverbackLogger.LogConnectError(mqttClientWrapper, new MqttCommunicationException("test"));

Expand Down Expand Up @@ -153,16 +153,15 @@ public void LogConnectionLost_Logged()
ClientId = "test-client",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt",
Port = 1234
RemoteEndpoint = new DnsEndPoint("mqtt", 1234)
}
},
Substitute.For<IBrokerCallbacksInvoker>(),
_silverbackLogger);

var expectedMessage =
"Connection with the MQTT broker lost. The client will try to reconnect. | " +
"clientId: test-client, broker: mqtt:1234";
"clientId: test-client, broker: Unspecified/mqtt:1234";

_silverbackLogger.LogConnectionLost(mqttClientWrapper);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// This code is licensed under MIT license (see LICENSE file for details)

using System;
using System.Net;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Client;
Expand Down Expand Up @@ -43,7 +44,7 @@ public void GetClient_ProducersWithSameClientConfig_SameClientReturned()
{
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
};

Expand Down Expand Up @@ -81,7 +82,7 @@ public void GetClient_ProducersWithEqualClientConfig_SameClientReturned()
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand All @@ -93,7 +94,7 @@ public void GetClient_ProducersWithEqualClientConfig_SameClientReturned()
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand Down Expand Up @@ -121,7 +122,7 @@ public void GetClient_ProducerAndConsumerWithEqualClientConfig_SameClientReturne
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand All @@ -133,7 +134,7 @@ public void GetClient_ProducerAndConsumerWithEqualClientConfig_SameClientReturne
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand Down Expand Up @@ -161,7 +162,7 @@ public void GetClient_ConsumerAndProducerWithEqualClientConfig_SameClientReturne
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand All @@ -173,7 +174,7 @@ public void GetClient_ConsumerAndProducerWithEqualClientConfig_SameClientReturne
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand Down Expand Up @@ -201,7 +202,7 @@ public void GetClient_ConsumersWithEquivalentClientConfig_ExceptionThrown()
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand All @@ -214,7 +215,7 @@ public void GetClient_ConsumersWithEquivalentClientConfig_ExceptionThrown()
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand All @@ -233,7 +234,7 @@ public void GetClient_ConsumersWithSameClientIdAndDifferentClientConfig_Exceptio
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand All @@ -245,7 +246,7 @@ public void GetClient_ConsumersWithSameClientIdAndDifferentClientConfig_Exceptio
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server2"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand All @@ -264,7 +265,7 @@ public void GetClient_ProducersWithDifferentClientConfig_DifferentClientsReturne
ClientId = "client1",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand All @@ -276,7 +277,7 @@ public void GetClient_ProducersWithDifferentClientConfig_DifferentClientsReturne
ClientId = "client2",
ChannelOptions = new MqttClientTcpOptions
{
Server = "mqtt-server"
RemoteEndpoint = new DnsEndPoint("test-server", 4242)
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// This code is licensed under MIT license (see LICENSE file for details)

using System;
using System.Net;
using FluentAssertions;
using MQTTnet.Client;
using NSubstitute;
Expand Down Expand Up @@ -36,7 +37,7 @@ public void MoveToMqttTopic_EndpointBuilder_MovePolicyCreated()
policy.As<MoveMessageErrorPolicy>().Endpoint.Name.Should().Be("test-move");
policy.As<MoveMessageErrorPolicy>().Endpoint
.As<MqttProducerEndpoint>().Configuration.ChannelOptions
.As<MqttClientTcpOptions>().Server.Should().Be("tests-server");
.As<MqttClientTcpOptions>().RemoteEndpoint.As<DnsEndPoint>().Host.Should().Be("tests-server");
}

[Fact]
Expand All @@ -53,7 +54,7 @@ public void MoveToMqttTopic_EndpointBuilderWithConfiguration_SkipPolicyCreatedAn
policy.As<MoveMessageErrorPolicy>().MaxFailedAttemptsCount.Should().Be(42);
policy.As<MoveMessageErrorPolicy>().Endpoint
.As<MqttProducerEndpoint>().Configuration.ChannelOptions
.As<MqttClientTcpOptions>().Server.Should().Be("tests-server");
.As<MqttClientTcpOptions>().RemoteEndpoint.As<DnsEndPoint>().Host.Should().Be("tests-server");
}
}
}
Loading

0 comments on commit 910dbc9

Please sign in to comment.