Skip to content

Commit

Permalink
Merge pull request #182 from Sharp-Pulsar/lookupservice
Browse files Browse the repository at this point in the history
Improved `BinaryProtoLookupService` with `Stash`
  • Loading branch information
eaba authored Sep 18, 2024
2 parents ca9377d + 177478d commit 1cea72c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 30 deletions.
13 changes: 7 additions & 6 deletions src/SharpPulsar.Test/CumulativeAck.cs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ public async Task TxnAckTestBatchedSharedSub()
for (var i = 0; i < messageCnt; i++)
{
var msg = consumer.Receive(TimeSpan.FromMicroseconds(5000));
_output.WriteLine($"receive msgId: {msg.MessageId}, count : {i}");
_output.WriteLine($"[1] receive msgId: {msg.MessageId}, count : {i}");
await consumer.AcknowledgeAsync(msg.MessageId, txn);
receivedMessageCount++;
}
Expand All @@ -493,7 +493,7 @@ public async Task TxnAckTestBatchedSharedSub()
if (message != null)
{
await consumer.AcknowledgeAsync(message.MessageId, commitTxn);
_output.WriteLine($"receive msgId: {message.MessageId}, count: {i}");
_output.WriteLine($"[1] receive msgId: {message.MessageId}, count: {i}");
}

}
Expand All @@ -512,6 +512,7 @@ public async Task TxnAckTestBatchedSharedSub()
}

[Fact(Skip = "TxnAckTestSharedSub")]
//[Fact]
public async Task TxnAckTestSharedSub()
{
var normalTopic = _nAMESPACE1 + $"/normal-topic-{Guid.NewGuid()}";
Expand Down Expand Up @@ -548,11 +549,11 @@ public async Task TxnAckTestSharedSub()
var msg = await consumer.ReceiveAsync(TimeSpan.FromMicroseconds(5000));
if(msg != null)
{
_output.WriteLine($"receive msgId: {msg.MessageId}, count : {i}");
_output.WriteLine($"[1] receive msgId: {msg.MessageId}, count : {i}");
await consumer.AcknowledgeAsync(msg.MessageId, txn);
receivedMessageCount++;
}

await Task.Delay(TimeSpan.FromSeconds(1));
}

// the messages are pending ack state and can't be received
Expand All @@ -571,10 +572,10 @@ public async Task TxnAckTestSharedSub()
if(message != null)
{
await consumer.AcknowledgeAsync(message.MessageId, commitTxn);
_output.WriteLine($"receive msgId: {message.MessageId}, count: {i}");
_output.WriteLine($"[2] receive msgId: {message.MessageId}, count: {i}");
receivedMessageCount++;
}

await Task.Delay(TimeSpan.FromSeconds(1));
}

// 2) ack committed by a new txn
Expand Down
67 changes: 44 additions & 23 deletions src/SharpPulsar/BinaryProtoLookupService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator,
_connectionPool = connectionPool;
_timeCnx = timeCnx;
_generator = idGenerator;
UpdateServiceUrl(serviceUrl);
/*
LatencyHistogram histo = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup.duration",
"Duration of lookup operations", null,
Attributes.builder().put("pulsar.lookup.transport-type", "binary").build());
histoGetBroker = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "topic").build());
histoGetTopicMetadata =
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build());
histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());
*/
}
private void UpdateServiceUrl(string serviceUrl)
{
_serviceNameResolver.UpdateServiceUrl(serviceUrl);
//Sender.Tell(0);
Become(PublicCommands);
}
private void PublicCommands()
{

Receive<SetClient>(c => { });
Receive<UpdateServiceUrl>(u => UpdateServiceUrl(u.ServiceUrl));
Receive<GetUpdateServiceUrl>(u =>
Expand All @@ -85,14 +106,15 @@ public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator,
try
{
await GetCnxAndRequestId();
Become(PrivateCommands);
await GetBroker(broke);
}
catch (Exception ex)
{
Sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex)));
}
});
ReceiveAsync<GetPartitionedTopicMetadata>(async p =>
ReceiveAsync<GetPartitionedTopicMetadata>(async p =>
{
try
{
Expand All @@ -109,8 +131,14 @@ public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator,
ReceiveAsync<GetTopicsUnderNamespace>(async t =>
{
await GetCnxAndRequestId();
Become(PrivateCommands);
await TopicsUnderNamespaceAsync(t);
});


}
private void PrivateCommands()
{
ReceiveAsync<SetFindBroker>(async set =>
{
await GetCnxAndRequestId(set.Address);
Expand All @@ -122,24 +150,7 @@ public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator,
_log.Warning($"Retrying 'GetTopicsUnderNamespace' after {set.NextDelay} ms delay with requestid '{_requestId}'");
await TopicsUnderNamespace(set.Ns, set.Backoff, set.Mode, set.TopicsPattern, set.TopicsHash, set.OpTimeout, set.Sender);
});

UpdateServiceUrl(serviceUrl);
/*
LatencyHistogram histo = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup.duration",
"Duration of lookup operations", null,
Attributes.builder().put("pulsar.lookup.transport-type", "binary").build());
histoGetBroker = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "topic").build());
histoGetTopicMetadata =
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build());
histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());
*/
}
private void UpdateServiceUrl(string serviceUrl)
{
_serviceNameResolver.UpdateServiceUrl(serviceUrl);
//Sender.Tell(0);
//Become(Awaiting);
ReceiveAny(s => Stash.Stash());
}
public string GetServiceUrl()
{
Expand Down Expand Up @@ -171,13 +182,12 @@ private async ValueTask TopicsUnderNamespaceAsync(GetTopicsUnderNamespace t)
var opTimeout = _operationTimeout;
var backOff = new BackoffBuilder().SetInitialTime(TimeSpan.FromMilliseconds(100)).SetMandatoryStop(opTimeout.Multiply(2)).SetMax(TimeSpan.FromMinutes(1)).Create();

await TopicsUnderNamespace(t.Namespace, backOff, t.Mode, t.TopicsPattern, t.TopicsHash, opTimeout, Sender);

await TopicsUnderNamespace(t.Namespace, backOff, t.Mode, t.TopicsPattern, t.TopicsHash, opTimeout, Sender);
}
catch (Exception e)
{
Sender.Tell(new AskResponse(PulsarClientException.Unwrap(e)));

Become(PublicCommands);
}
}
private async ValueTask TopicsUnderNamespace(NamespaceName ns, Backoff backoff, Mode mode, string topicsPattern, string topicsHash, TimeSpan opTimeout, IActorRef sender)
Expand Down Expand Up @@ -212,6 +222,7 @@ private async ValueTask TopicsUnderNamespace(NamespaceName ns, Backoff backoff,
if (ex.Message == "Unable to write data to the transport connection: An established connection was aborted by the software in your host machine..")
{
sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex)));
UnstashAll();
return;
}
var nextDelay = Math.Min(backoff.Next(), opTimeout.TotalMilliseconds);
Expand All @@ -236,8 +247,15 @@ private async ValueTask TopicsUnderNamespace(NamespaceName ns, Backoff backoff,
Timers.StartSingleTimer(SetTopicsUnderNamespace.Instance,
new SetTopicsUnderNamespace(ns, backoff, mode, topicsPattern, topicsHash, _opTime, sender, nextDelay), TimeSpan.FromMilliseconds(nextDelay));
//_self.Tell(false);
return;
}
}
UnstashAll();
}
private void UnstashAll()
{
Stash?.UnstashAll();
Become(PublicCommands);
}
private async ValueTask GetBroker(GetBroker broker)
{
Expand All @@ -259,6 +277,7 @@ private async ValueTask FindBroker(TopicName topic, int redirectCount, DnsEndPoi
_log.Error(err.ToString());

replyTo.Tell(new AskResponse(new PulsarClientException(err)));
UnstashAll();
return;
}
var askResponse = await NewLookup(topic, authoritative);
Expand All @@ -277,7 +296,7 @@ private async ValueTask FindBroker(TopicName topic, int redirectCount, DnsEndPoi
{
_log.Warning($"[{topic}] lookup failed : {askResponse.Exception.Message}");
}

UnstashAll();
return;
}
var data = askResponse.ConvertTo<LookupDataResult>();
Expand Down Expand Up @@ -312,6 +331,7 @@ private async ValueTask FindBroker(TopicName topic, int redirectCount, DnsEndPoi
if (data.Redirect)
{
Self.Tell(new SetFindBroker(topic, redirectCount + 1, responseBrokerAddress, data.Authoritative, replyTo));
return;
}
else
{
Expand All @@ -327,6 +347,7 @@ private async ValueTask FindBroker(TopicName topic, int redirectCount, DnsEndPoi
replyTo.Tell(new AskResponse(new PulsarClientException(parseUrlException)));
}
}
UnstashAll();
}
private async ValueTask GetCnxAndRequestId()
{
Expand Down
2 changes: 1 addition & 1 deletion src/SharpPulsar/Client/ClientCnx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace SharpPulsar.Client
{
internal sealed class ClientCnx : ReceiveActor, IWithUnboundedStash, IWithTimers
internal sealed class ClientCnx : ReceiveActor, IWithTimers
{
private readonly IActorRef _socketClient;
private readonly IAuthentication _authentication;
Expand Down

0 comments on commit 1cea72c

Please sign in to comment.