From 177478dbcd8b4b0ccfedec9c0219187138c25794 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Wed, 18 Sep 2024 18:38:58 +0100 Subject: [PATCH] Improved `BinaryProtoLookupService` with `Stash` --- src/SharpPulsar.Test/CumulativeAck.cs | 13 ++-- src/SharpPulsar/BinaryProtoLookupService.cs | 67 ++++++++++++++------- src/SharpPulsar/Client/ClientCnx.cs | 2 +- 3 files changed, 52 insertions(+), 30 deletions(-) diff --git a/src/SharpPulsar.Test/CumulativeAck.cs b/src/SharpPulsar.Test/CumulativeAck.cs index ed75374f..f6454da1 100644 --- a/src/SharpPulsar.Test/CumulativeAck.cs +++ b/src/SharpPulsar.Test/CumulativeAck.cs @@ -472,7 +472,7 @@ public async Task TxnAckTestBatchedSharedSub() for (var i = 0; i < messageCnt; i++) { var msg = consumer.Receive(TimeSpan.FromMicroseconds(5000)); - _output.WriteLine($"receive msgId: {msg.MessageId}, count : {i}"); + _output.WriteLine($"[1] receive msgId: {msg.MessageId}, count : {i}"); await consumer.AcknowledgeAsync(msg.MessageId, txn); receivedMessageCount++; } @@ -493,7 +493,7 @@ public async Task TxnAckTestBatchedSharedSub() if (message != null) { await consumer.AcknowledgeAsync(message.MessageId, commitTxn); - _output.WriteLine($"receive msgId: {message.MessageId}, count: {i}"); + _output.WriteLine($"[1] receive msgId: {message.MessageId}, count: {i}"); } } @@ -512,6 +512,7 @@ public async Task TxnAckTestBatchedSharedSub() } [Fact(Skip = "TxnAckTestSharedSub")] + //[Fact] public async Task TxnAckTestSharedSub() { var normalTopic = _nAMESPACE1 + $"/normal-topic-{Guid.NewGuid()}"; @@ -548,11 +549,11 @@ public async Task TxnAckTestSharedSub() var msg = await consumer.ReceiveAsync(TimeSpan.FromMicroseconds(5000)); if(msg != null) { - _output.WriteLine($"receive msgId: {msg.MessageId}, count : {i}"); + _output.WriteLine($"[1] receive msgId: {msg.MessageId}, count : {i}"); await consumer.AcknowledgeAsync(msg.MessageId, txn); receivedMessageCount++; } - + await Task.Delay(TimeSpan.FromSeconds(1)); } // the messages are pending ack state and can't be received @@ -571,10 +572,10 @@ public async Task TxnAckTestSharedSub() if(message != null) { await consumer.AcknowledgeAsync(message.MessageId, commitTxn); - _output.WriteLine($"receive msgId: {message.MessageId}, count: {i}"); + _output.WriteLine($"[2] receive msgId: {message.MessageId}, count: {i}"); receivedMessageCount++; } - + await Task.Delay(TimeSpan.FromSeconds(1)); } // 2) ack committed by a new txn diff --git a/src/SharpPulsar/BinaryProtoLookupService.cs b/src/SharpPulsar/BinaryProtoLookupService.cs index 74884724..91324633 100644 --- a/src/SharpPulsar/BinaryProtoLookupService.cs +++ b/src/SharpPulsar/BinaryProtoLookupService.cs @@ -71,6 +71,27 @@ public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator, _connectionPool = connectionPool; _timeCnx = timeCnx; _generator = idGenerator; + UpdateServiceUrl(serviceUrl); + /* + LatencyHistogram histo = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup.duration", + "Duration of lookup operations", null, + Attributes.builder().put("pulsar.lookup.transport-type", "binary").build()); + histoGetBroker = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "topic").build()); + histoGetTopicMetadata = + histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build()); + histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build()); + histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build()); + */ + } + private void UpdateServiceUrl(string serviceUrl) + { + _serviceNameResolver.UpdateServiceUrl(serviceUrl); + //Sender.Tell(0); + Become(PublicCommands); + } + private void PublicCommands() + { + Receive(c => { }); Receive(u => UpdateServiceUrl(u.ServiceUrl)); Receive(u => @@ -85,6 +106,7 @@ public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator, try { await GetCnxAndRequestId(); + Become(PrivateCommands); await GetBroker(broke); } catch (Exception ex) @@ -92,7 +114,7 @@ public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator, Sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex))); } }); - ReceiveAsync(async p => + ReceiveAsync(async p => { try { @@ -109,8 +131,14 @@ public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator, ReceiveAsync(async t => { await GetCnxAndRequestId(); + Become(PrivateCommands); await TopicsUnderNamespaceAsync(t); }); + + + } + private void PrivateCommands() + { ReceiveAsync(async set => { await GetCnxAndRequestId(set.Address); @@ -122,24 +150,7 @@ public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator, _log.Warning($"Retrying 'GetTopicsUnderNamespace' after {set.NextDelay} ms delay with requestid '{_requestId}'"); await TopicsUnderNamespace(set.Ns, set.Backoff, set.Mode, set.TopicsPattern, set.TopicsHash, set.OpTimeout, set.Sender); }); - - UpdateServiceUrl(serviceUrl); - /* - LatencyHistogram histo = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup.duration", - "Duration of lookup operations", null, - Attributes.builder().put("pulsar.lookup.transport-type", "binary").build()); - histoGetBroker = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "topic").build()); - histoGetTopicMetadata = - histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build()); - histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build()); - histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build()); - */ - } - private void UpdateServiceUrl(string serviceUrl) - { - _serviceNameResolver.UpdateServiceUrl(serviceUrl); - //Sender.Tell(0); - //Become(Awaiting); + ReceiveAny(s => Stash.Stash()); } public string GetServiceUrl() { @@ -171,13 +182,12 @@ private async ValueTask TopicsUnderNamespaceAsync(GetTopicsUnderNamespace t) var opTimeout = _operationTimeout; var backOff = new BackoffBuilder().SetInitialTime(TimeSpan.FromMilliseconds(100)).SetMandatoryStop(opTimeout.Multiply(2)).SetMax(TimeSpan.FromMinutes(1)).Create(); - await TopicsUnderNamespace(t.Namespace, backOff, t.Mode, t.TopicsPattern, t.TopicsHash, opTimeout, Sender); - + await TopicsUnderNamespace(t.Namespace, backOff, t.Mode, t.TopicsPattern, t.TopicsHash, opTimeout, Sender); } catch (Exception e) { Sender.Tell(new AskResponse(PulsarClientException.Unwrap(e))); - + Become(PublicCommands); } } private async ValueTask TopicsUnderNamespace(NamespaceName ns, Backoff backoff, Mode mode, string topicsPattern, string topicsHash, TimeSpan opTimeout, IActorRef sender) @@ -212,6 +222,7 @@ private async ValueTask TopicsUnderNamespace(NamespaceName ns, Backoff backoff, if (ex.Message == "Unable to write data to the transport connection: An established connection was aborted by the software in your host machine..") { sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex))); + UnstashAll(); return; } var nextDelay = Math.Min(backoff.Next(), opTimeout.TotalMilliseconds); @@ -236,8 +247,15 @@ private async ValueTask TopicsUnderNamespace(NamespaceName ns, Backoff backoff, Timers.StartSingleTimer(SetTopicsUnderNamespace.Instance, new SetTopicsUnderNamespace(ns, backoff, mode, topicsPattern, topicsHash, _opTime, sender, nextDelay), TimeSpan.FromMilliseconds(nextDelay)); //_self.Tell(false); + return; } } + UnstashAll(); + } + private void UnstashAll() + { + Stash?.UnstashAll(); + Become(PublicCommands); } private async ValueTask GetBroker(GetBroker broker) { @@ -259,6 +277,7 @@ private async ValueTask FindBroker(TopicName topic, int redirectCount, DnsEndPoi _log.Error(err.ToString()); replyTo.Tell(new AskResponse(new PulsarClientException(err))); + UnstashAll(); return; } var askResponse = await NewLookup(topic, authoritative); @@ -277,7 +296,7 @@ private async ValueTask FindBroker(TopicName topic, int redirectCount, DnsEndPoi { _log.Warning($"[{topic}] lookup failed : {askResponse.Exception.Message}"); } - + UnstashAll(); return; } var data = askResponse.ConvertTo(); @@ -312,6 +331,7 @@ private async ValueTask FindBroker(TopicName topic, int redirectCount, DnsEndPoi if (data.Redirect) { Self.Tell(new SetFindBroker(topic, redirectCount + 1, responseBrokerAddress, data.Authoritative, replyTo)); + return; } else { @@ -327,6 +347,7 @@ private async ValueTask FindBroker(TopicName topic, int redirectCount, DnsEndPoi replyTo.Tell(new AskResponse(new PulsarClientException(parseUrlException))); } } + UnstashAll(); } private async ValueTask GetCnxAndRequestId() { diff --git a/src/SharpPulsar/Client/ClientCnx.cs b/src/SharpPulsar/Client/ClientCnx.cs index 8fb25419..a448a9d2 100644 --- a/src/SharpPulsar/Client/ClientCnx.cs +++ b/src/SharpPulsar/Client/ClientCnx.cs @@ -24,7 +24,7 @@ namespace SharpPulsar.Client { - internal sealed class ClientCnx : ReceiveActor, IWithUnboundedStash, IWithTimers + internal sealed class ClientCnx : ReceiveActor, IWithTimers { private readonly IActorRef _socketClient; private readonly IAuthentication _authentication;