Skip to content

Commit 598643b

Browse files
committed
Re: #47 - Return a Task from Track and Untrack methods
1 parent b48e4b7 commit 598643b

File tree

4 files changed

+69
-19
lines changed

4 files changed

+69
-19
lines changed

Realtime/Interfaces/IRealtimePresence.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Supabase.Realtime.Socket;
22
using System;
3+
using System.Threading.Tasks;
34
using Supabase.Realtime.Models;
45
using static Supabase.Realtime.Constants;
56

@@ -39,7 +40,14 @@ public enum EventType
3940
/// </summary>
4041
/// <param name="payload"></param>
4142
/// <param name="timeoutMs"></param>
42-
void Track(object? payload, int timeoutMs = DefaultTimeout);
43+
Task Track(object? payload, int timeoutMs = DefaultTimeout);
44+
45+
/// <summary>
46+
/// Untracks a client
47+
/// </summary>
48+
/// <param name="payload"></param>
49+
/// <param name="timeoutMs"></param>
50+
Task Untrack();
4351

4452
/// <summary>
4553
/// Add a presence event handler

Realtime/RealtimeChannel.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public class RealtimeChannel : IRealtimeChannel
145145
/// </summary>
146146
private readonly List<Push> _buffer = new();
147147

148-
private readonly IRealtimeSocket _socket;
148+
internal readonly IRealtimeSocket Socket;
149149
private IRealtimePresence? _presence;
150150
private IRealtimeBroadcast? _broadcast;
151151
private RealtimeException? _exception;
@@ -157,7 +157,7 @@ public class RealtimeChannel : IRealtimeChannel
157157
private readonly Dictionary<ListenType, List<PostgresChangesHandler>> _postgresChangesHandlers =
158158
new();
159159

160-
private bool CanPush => IsJoined && _socket.IsConnected;
160+
private bool CanPush => IsJoined && Socket.IsConnected;
161161
private bool _hasJoinedOnce;
162162
private readonly Timer _rejoinTimer;
163163
private bool _isRejoining;
@@ -171,8 +171,8 @@ public RealtimeChannel(IRealtimeSocket socket, string channelName, ChannelOption
171171
Options = options;
172172
Options.Parameters ??= new Dictionary<string, string>();
173173

174-
_socket = socket;
175-
_socket.AddStateChangedHandler(HandleSocketStateChanged);
174+
Socket = socket;
175+
Socket.AddStateChangedHandler(HandleSocketStateChanged);
176176

177177
_rejoinTimer = new Timer(options.ClientOptions.Timeout.TotalMilliseconds);
178178
_rejoinTimer.Elapsed += HandleRejoinTimerElapsed;
@@ -508,7 +508,7 @@ public IRealtimeChannel Unsubscribe()
508508

509509
NotifyStateChanged(ChannelState.Leaving);
510510

511-
var leavePush = new Push(_socket, this, ChannelEventLeave);
511+
var leavePush = new Push(Socket, this, ChannelEventLeave);
512512
leavePush.Send();
513513

514514
NotifyStateChanged(ChannelState.Closed, false);
@@ -536,7 +536,7 @@ public Push Push(string eventName, string? type = null, object? payload = null,
536536
};
537537
}
538538

539-
var push = new Push(_socket, this, eventName, type, payload, timeoutMs);
539+
var push = new Push(Socket, this, eventName, type, payload, timeoutMs);
540540
Enqueue(push);
541541

542542
return push;
@@ -582,7 +582,7 @@ public void Rejoin(int timeoutMs = DefaultTimeout)
582582
/// Enqueues a message.
583583
/// </summary>
584584
/// <param name="push"></param>
585-
private void Enqueue(Push push)
585+
internal void Enqueue(Push push)
586586
{
587587
LastPush = push;
588588

@@ -601,7 +601,7 @@ private void Enqueue(Push push)
601601
/// Generates the Join Push message by merging broadcast, presence, and postgres_changes options.
602602
/// </summary>
603603
/// <returns></returns>
604-
private Push GenerateJoinPush() => new(_socket, this, ChannelEventJoin,
604+
private Push GenerateJoinPush() => new(Socket, this, ChannelEventJoin,
605605
payload: new JoinPush(BroadcastOptions, PresenceOptions, PostgresChangesOptions));
606606

607607
/// <summary>
@@ -614,7 +614,7 @@ private void Enqueue(Push push)
614614

615615
if (!string.IsNullOrEmpty(accessToken))
616616
{
617-
return new Push(_socket, this, ChannelAccessToken, payload: new Dictionary<string, string>
617+
return new Push(Socket, this, ChannelAccessToken, payload: new Dictionary<string, string>
618618
{
619619
{ "access_token", accessToken! }
620620
});

Realtime/RealtimePresence.cs

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
using Supabase.Realtime.Socket;
77
using System;
88
using System.Collections.Generic;
9+
using System.Threading.Tasks;
10+
using Supabase.Realtime.Channel;
11+
using Supabase.Realtime.Exceptions;
912
using static Supabase.Realtime.Constants;
1013

1114
namespace Supabase.Realtime;
@@ -100,7 +103,7 @@ public void ClearPresenceEventHandlers(IRealtimePresence.EventType? eventType =
100103
private void NotifyPresenceEventHandlers(IRealtimePresence.EventType eventType)
101104
{
102105
if (!_presenceEventListeners.ContainsKey(eventType)) return;
103-
106+
104107
foreach (var handler in _presenceEventListeners[eventType].ToArray())
105108
handler.Invoke(this, eventType);
106109
}
@@ -149,20 +152,57 @@ public void TriggerDiff(SocketResponse response)
149152
/// </summary>
150153
/// <param name="payload"></param>
151154
/// <param name="timeoutMs"></param>
152-
public void Track(object? payload, int timeoutMs = DefaultTimeout)
155+
public Task Track(object? payload, int timeoutMs = DefaultTimeout)
153156
{
154157
var eventName = Core.Helpers.GetMappedToAttr(ChannelEventName.Presence).Mapping;
155-
_channel.Push(eventName, "track",
158+
var push = new Push(_channel.Socket, _channel, eventName, "track",
156159
new Dictionary<string, object?> { { "event", "track" }, { "payload", payload } }, timeoutMs);
160+
161+
var tcs = new TaskCompletionSource<Push>();
162+
163+
void Handler(IRealtimePush<RealtimeChannel, SocketResponse> chanel, SocketResponse response)
164+
{
165+
tcs.TrySetResult(push);
166+
}
167+
168+
push.AddMessageReceivedHandler(Handler);
169+
170+
push.OnTimeout += (sender, args) =>
171+
{
172+
tcs.SetException(new RealtimeException(args.ToString()) { Reason = FailureHint.Reason.PushTimeout });
173+
};
174+
175+
_channel.Enqueue(push);
176+
177+
return tcs.Task;
157178
}
158179

159180
/// <summary>
160181
/// Untracks an event.
161182
/// </summary>
162-
public void Untrack()
183+
public Task Untrack()
163184
{
164185
var eventName = Core.Helpers.GetMappedToAttr(ChannelEventName.Presence).Mapping;
165-
_channel.Push(eventName, "untrack");
186+
var push = new Push(_channel.Socket, _channel, eventName, "untrack",
187+
new Dictionary<string, object?> { { "event", "untrack" } });
188+
189+
var tcs = new TaskCompletionSource<Push>();
190+
191+
void Handler(IRealtimePush<RealtimeChannel, SocketResponse> chanel, SocketResponse response)
192+
{
193+
tcs.TrySetResult(push);
194+
}
195+
196+
push.AddMessageReceivedHandler(Handler);
197+
198+
push.OnTimeout += (sender, args) =>
199+
{
200+
tcs.TrySetException(new RealtimeException((sender as Push)!.Ref)
201+
{ Reason = FailureHint.Reason.PushTimeout });
202+
};
203+
204+
_channel.Enqueue(push);
205+
return tcs.Task;
166206
}
167207

168208
/// <summary>

RealtimeTests/ChannelPresenceTests.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public async Task ClientCanCreatePresence()
5050
{
5151
var state = presence1.CurrentState;
5252
if (state.ContainsKey(guid2) && state[guid2].First().Time != null)
53-
tsc.SetResult(true);
53+
tsc.TrySetResult(true);
5454
});
5555

5656
var client2 = Helpers.SocketClient();
@@ -61,15 +61,17 @@ public async Task ClientCanCreatePresence()
6161
{
6262
var state = presence2.CurrentState;
6363
if (state.ContainsKey(guid1) && state[guid1].First().Time != null)
64-
tsc2.SetResult(true);
64+
tsc2.TrySetResult(true);
6565
});
6666

6767
await channel1.Subscribe();
6868
await channel2.Subscribe();
6969

70-
presence1.Track(new PresenceExample { Time = DateTime.Now });
71-
presence2.Track(new PresenceExample { Time = DateTime.Now });
70+
await presence1.Track(new PresenceExample { Time = DateTime.Now });
71+
await presence2.Track(new PresenceExample { Time = DateTime.Now });
7272

73+
await presence1.Untrack();
74+
7375
await Task.WhenAll(new[] { tsc.Task, tsc2.Task });
7476
}
7577
}

0 commit comments

Comments
 (0)