Skip to content

Commit

Permalink
Improved TableView
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba committed Aug 31, 2024
1 parent c299d9c commit da55bde
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 22 deletions.
14 changes: 12 additions & 2 deletions src/SharpPulsar/Table/TableView.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Routing;
using Akka.Util.Internal;
using SharpPulsar.Interfaces;
using SharpPulsar.Table.Messages;

Expand All @@ -26,9 +29,16 @@ public void Dispose()

public void ForEachAndListen(Action<string, T> action)
{
_tableViewActor.Tell(new ForEachAction<T>(action));
try
{
var data = _tableViewActor.Ask<ImmutableDictionary<string, T>>(TableData.Instance).GetAwaiter().GetResult();
data.ForEach(kv => action(kv.Key, kv.Value));
_tableViewActor.Tell(action);
}
finally { }

}

public virtual int Size()
{
//return _data.Count();
Expand Down
31 changes: 11 additions & 20 deletions src/SharpPulsar/Table/TableViewActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,16 @@ public TableViewActor(PulsarClient client, ISchema<T> schema, TableViewConfigura
readerBuilder.CryptoFailureAction(_conf.CryptoFailureAction);

_reader = _client.NewReader(_schema, readerBuilder);
Receive<ForEachAction<T>>(a => ForEachAndListen(a.Action));

Receive<TableData>(_ =>
{
Sender.Tell(_data.ToImmutableDictionary());
});
//
Receive<Action<string, T>>(a =>
{
_listeners.Add(a);
});
Receive<StartMessage>(_ =>
{
_replyTo = Sender;
Expand Down Expand Up @@ -200,6 +209,7 @@ private void HandleMessage(IMessage<T> msg)
try
{
listener(key, cur);
_log.Info($"Table view listener raised => {key}:{cur}");
}
catch (Exception t)
{
Expand Down Expand Up @@ -261,25 +271,6 @@ private void FilterReceivedMessages(IDictionary<string, ITopicMessageId> lastMes

}

private void ForEach(Action<string, T> action)
{
_data.ForEach(kv=> action(kv.Key, kv.Value));
}

private void ForEachAndListen(Action<string, T> action)
{
// Ensure we iterate over all the existing entry _and_ start the listening from the exact next message
try
{
// Execute the action over existing entries
ForEach(action);

_listeners.Add(action);
}
finally
{
}
}
protected override void PostStop()
{
base.PostStop();
Expand Down

0 comments on commit da55bde

Please sign in to comment.