Skip to content

Commit

Permalink
Merge pull request #177 from Sharp-Pulsar/auto_cluster
Browse files Browse the repository at this point in the history
[Fix] `ServiceUrlProvider`
  • Loading branch information
eaba authored Feb 15, 2024
2 parents a2bc701 + 3591a87 commit 8de5353
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 39 deletions.
10 changes: 5 additions & 5 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
<JsonSubTypesVersion>2.0.1</JsonSubTypesVersion>
<K4osCompressionLZ4Version>1.3.6</K4osCompressionLZ4Version>
<MicrosoftCSharpVersion>4.7.0</MicrosoftCSharpVersion>
<MicrosoftExtensionsApiDescriptionClientVersion>8.0.1</MicrosoftExtensionsApiDescriptionClientVersion>
<MicrosoftExtensionsApiDescriptionClientVersion>8.0.2</MicrosoftExtensionsApiDescriptionClientVersion>
<MicrosoftExtensionsLoggingVersion>8.0.0</MicrosoftExtensionsLoggingVersion>
<MicrosoftIORecyclableMemoryStreamVersion>3.0.0</MicrosoftIORecyclableMemoryStreamVersion>
<MicrosoftRestClientRuntimeVersion>2.3.24</MicrosoftRestClientRuntimeVersion>
<NagerPublicSuffixVersion>2.4.0</NagerPublicSuffixVersion>
<NagerPublicSuffixVersion>3.0.0</NagerPublicSuffixVersion>
<NewtonsoftJsonVersion>13.0.3</NewtonsoftJsonVersion>
<NitoAsyncExVersion>5.1.2</NitoAsyncExVersion>
<NodaTimeVersion>3.1.10</NodaTimeVersion>
<NodaTimeVersion>3.1.11</NodaTimeVersion>
<DocfxVersion>2.67.5</DocfxVersion>
<NSwagApiDescriptionClientVersion>14.0.2</NSwagApiDescriptionClientVersion>
<NSwagApiDescriptionClientVersion>14.0.3</NSwagApiDescriptionClientVersion>
<OpenTelemetryVersion>1.7.0</OpenTelemetryVersion>
<PortableBouncyCastleVersion>1.9.0</PortableBouncyCastleVersion>
<ProNBenchxUnitVersion>2.0.0</ProNBenchxUnitVersion>
Expand All @@ -36,7 +36,7 @@
<SystemRuntimeCompilerServicesUnsafeVersion>6.0.0</SystemRuntimeCompilerServicesUnsafeVersion>
<SystemRuntimeSerializationPrimitivesVersion>4.3.0</SystemRuntimeSerializationPrimitivesVersion>
<SystemSecurityCryptographyCngVersion>5.0.0</SystemSecurityCryptographyCngVersion>
<SystemTextJsonVersion>8.0.1</SystemTextJsonVersion>
<SystemTextJsonVersion>8.0.2</SystemTextJsonVersion>
<SystemThreadingTasksDataflowVersion>8.0.0</SystemThreadingTasksDataflowVersion>
<zlibnetmutliplatformVersion>1.0.6</zlibnetmutliplatformVersion>
<ZstdNetVersion>1.4.5</ZstdNetVersion>
Expand Down
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<PackageVersion Include="Castle.Core" Version="5.1.1" />
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageVersion Include="OpenTelemetry" Version="$(OpenTelemetryVersion)" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="$(OpenTelemetryVersion)" />
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="$(OpenTelemetryVersion)" />
Expand All @@ -64,7 +64,7 @@
<PackageVersion Include="coverlet.collector" Version="6.0.0" />
<PackageVersion Include="Ductus.FluentDocker" Version="2.10.59" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageVersion Include="polly" Version="8.2.1" />
<PackageVersion Include="polly" Version="8.3.0" />
<PackageVersion Include="SharpCompress" Version="0.36.0" />
<PackageVersion Include="Microsoft.Azure.Management.AppService.Fluent" Version="1.38.1" />
<PackageVersion Include="Nuke.Common" Version="8.0.0" />
Expand Down
37 changes: 26 additions & 11 deletions Tutorials/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using SharpPulsar.Builder;
using SharpPulsar.Interfaces;
using SharpPulsar.Schemas;
using SharpPulsar.ServiceProvider;
using SharpPulsar.TransactionImpl;
using SharpPulsar.Trino;
using SharpPulsar.Trino.Message;
Expand Down Expand Up @@ -55,24 +56,38 @@ static async Task Main(string[] args)
}
if (selection.Equals("1"))
url = "pulsar+ssl://127.0.0.1:6651";


var clientConfig = new PulsarClientConfigBuilder()
.ServiceUrl(url);

if (selection.Equals("1"))
var clientConfig = new PulsarClientConfigBuilder();
Console.WriteLine("auto-cluster or config?");
var cluster = Console.ReadLine();
if (cluster == "auto-cluster")
{
var ca = new System.Security.Cryptography.X509Certificates.X509Certificate2(@"certs/ca.cert.pem");
clientConfig.EnableTls(true);
clientConfig.AddTrustedAuthCert(ca);
clientConfig
.ServiceUrlProvider(new AutoClusterFailover((AutoClusterFailoverBuilder)new AutoClusterFailoverBuilder()
.Primary("pulsar://localhost:6650")
.Secondary(new List<string> { "pulsar://localhost:6650" })
.CheckInterval(TimeSpan.FromSeconds(20))
.FailoverDelay(TimeSpan.FromSeconds(20))
.SwitchBackDelay(TimeSpan.FromSeconds(20))));
}
else
{
clientConfig
.ServiceUrl(url)
.EnableTransaction(true);
if (selection.Equals("1"))
{
var ca = new System.Security.Cryptography.X509Certificates.X509Certificate2(@"certs/ca.cert.pem");
clientConfig.EnableTls(true);
clientConfig.AddTrustedAuthCert(ca);
}
}

Console.WriteLine("Please, time to execute some command, which do you want?");
var cmd = Console.ReadLine();

clientConfig.EnableTransaction(true);

//pulsar actor system
var pulsarSystem = PulsarSystem.GetInstance(actorSystemName: "program");


var pulsarClient = await pulsarSystem.NewClient(clientConfig);
_client = pulsarClient;
Expand Down
18 changes: 14 additions & 4 deletions src/SharpPulsar.Test/AutoClusterFailoverTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
using System.Threading.Tasks;
using System.Collections.Generic;
using Xunit;
using Akka.Actor;
using SharpPulsar.ServiceProvider;
using Ductus.FluentDocker.Commands;

namespace SharpPulsar.Test
{
Expand All @@ -19,11 +22,11 @@ public class AutoClusterFailoverTest : IAsyncLifetime
private readonly ITestOutputHelper _output;
//private TaskCompletionSource<PulsarClient> _tcs;
private PulsarSystem _system;
private PulsarClientConfigBuilder _configBuilder;
// private PulsarClientConfigBuilder _configBuilder;
public AutoClusterFailoverTest(ITestOutputHelper output, PulsarFixture fixture)
{
_output = output;
_configBuilder = fixture.ConfigBuilder;
// _configBuilder = fixture.ConfigBuilder;
_system = fixture.System;
}
[Fact]
Expand Down Expand Up @@ -64,7 +67,7 @@ await producer.NewMessage().KeyBytes(byteKey)

Assert.True(message.HasBase64EncodedKey());
var receivedMessage = Encoding.UTF8.GetString(message.Data);
_output.WriteLine($"Received message: [{receivedMessage}]");
_output.WriteLine($"Received message: [{receivedMessage}]. Time: {DateTime.Now.ToLongTimeString()}");
Assert.Equal("AutoMessage", receivedMessage);
}
await Act(consumer, producer);
Expand All @@ -91,7 +94,14 @@ public async Task InitializeAsync()
_tcs.TrySetResult(client);
})();
_client = await _tcs.Task; */
_client = await _system.NewClient(_configBuilder);
var auto = new AutoClusterFailover((AutoClusterFailoverBuilder)new AutoClusterFailoverBuilder()
.Primary("pulsar://localhost:6650")
.Secondary(new List<string> { "pulsar://localhost:6650" })
.CheckInterval(TimeSpan.FromSeconds(20))
.FailoverDelay(TimeSpan.FromSeconds(20))
.SwitchBackDelay(TimeSpan.FromSeconds(20)));
var b = new PulsarClientConfigBuilder().ServiceUrlProvider(auto);
_client = await _system.NewClient(b);
}

public async Task DisposeAsync()
Expand Down
31 changes: 15 additions & 16 deletions src/SharpPulsar/Builder/PulsarClientConfigBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using SharpPulsar.Common;
using SharpPulsar.Configuration;
using SharpPulsar.Interfaces;
using SharpPulsar.Precondition;
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
Expand Down Expand Up @@ -207,24 +208,22 @@ public ClientConfigurationData ClientConfigurationData
{
get
{
if (string.IsNullOrWhiteSpace(_conf.ServiceUrl) && _conf.ServiceUrlProvider == null)
{
throw new ArgumentException("service URL or service URL provider needs to be specified on the ClientBuilder object.");
}
if (!string.IsNullOrWhiteSpace(_conf.ServiceUrl) && _conf.ServiceUrlProvider != null)
{
throw new ArgumentException("Can only chose one way service URL or service URL provider.");
}
if (string.IsNullOrWhiteSpace(_conf.ServiceUrl))
if (_conf.ServiceUrlProvider == null)
throw new ArgumentNullException("service URL or service URL provider needs to be specified on the ClientBuilder object.");

// Condition.CheckArgument(string.IsNullOrWhiteSpace(_conf.ServiceUrl) && _conf.ServiceUrlProvider == null,
// "service URL or service URL provider needs to be specified on the ClientBuilder object.");

Condition.CheckArgument(!string.IsNullOrWhiteSpace(_conf.ServiceUrl) || _conf.ServiceUrlProvider != null,
"Can only chose one way service URL or service URL provider.");

if (_conf.ServiceUrlProvider != null)
{
if (string.IsNullOrWhiteSpace(_conf.ServiceUrlProvider.ServiceUrl))
{
throw new ArgumentException("Cannot get service url from service url provider.");
}
else
{
_conf.ServiceUrl = _conf.ServiceUrlProvider.ServiceUrl;
}
Condition.CheckArgument(!string.IsNullOrWhiteSpace(_conf.ServiceUrlProvider.ServiceUrl),
"Cannot get service url from service url provider.");

_conf.ServiceUrl = _conf.ServiceUrlProvider.ServiceUrl;
}
return _conf;
}
Expand Down
3 changes: 2 additions & 1 deletion src/SharpPulsar/Tls/TlsHostnameVerifier.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Nager.PublicSuffix;
using Nager.PublicSuffix.RuleProviders;
using SharpPulsar.Common;
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -78,7 +79,7 @@ public static HostNameType valueOf(string name)
}
private readonly ILoggingAdapter _log;

private readonly DomainParser _publicSuffixMatcher = new DomainParser(new WebTldRuleProvider());
private readonly DomainParser _publicSuffixMatcher = new DomainParser(new SimpleHttpRuleProvider());
public TlsHostnameVerifier(ILoggingAdapter log)
{
_log = log;
Expand Down

0 comments on commit 8de5353

Please sign in to comment.