Skip to content

fix(): implement filter on PostgresChangeHandler #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ jobs:

- name: Build
run: dotnet build --configuration Release --no-restore

- uses: supabase/setup-cli@v1
with:
version: latest

- name: Start Supabsae
run: supabase start

- name: Test
run: dotnet test --no-restore

#- name: Add hosts entries
# run: |
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,8 @@ ASALocalRun/
# Local History for Visual Studio
.localhistory/
/RealtimeTests/.runsettings

# supabase stuffs
supabase/.branches
supabase/.temp
supabase/.env
15 changes: 15 additions & 0 deletions Realtime/Binding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Supabase.Realtime.Interfaces;
using Supabase.Realtime.PostgresChanges;

namespace Supabase.Realtime;

public class Binding

Check warning on line 6 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding'

Check warning on line 6 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding'
{
public PostgresChangesOptions? Options { get; set; }

Check warning on line 8 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.Options'

Check warning on line 8 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.Options'

public IRealtimeChannel.PostgresChangesHandler? Handler { get; set; }

Check warning on line 10 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.Handler'

Check warning on line 10 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.Handler'

public PostgresChangesOptions.ListenType? ListenType { get; set; }

Check warning on line 12 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.ListenType'

Check warning on line 12 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.ListenType'

public int? Id { get; set; }

Check warning on line 14 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.Id'

Check warning on line 14 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.Id'
}
9 changes: 7 additions & 2 deletions Realtime/Channel/Push.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,13 @@ internal void StartTimeout()
/// <param name="message"></param>
private void HandleSocketMessageReceived(IRealtimeSocket sender, SocketResponse message)
{
if (message.Ref != Ref) return;

// Needs to verify if realtime server won't send the message below anymore after receive a track presence event
// {"ref":"bd07efe5-ca06-4257-b080-79779c6f76c4","event":"phx_reply","payload":{"status":"ok","response":{}},"topic":"realtime:online-users"}
// the message was used to stop timeout handler
// All tests still work on version before 2.34.21
var isPresenceDiff = message is { Event: Constants.EventType.PresenceDiff };
if (!isPresenceDiff && message.Ref != Ref) return;

CancelTimeout();
Response = message;
NotifyMessageReceived(message);
Expand Down
38 changes: 36 additions & 2 deletions Realtime/PostgresChanges/PostgresChangesOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Newtonsoft.Json;
using System;
using Newtonsoft.Json;
using Supabase.Core.Attributes;
using System.Collections.Generic;

Expand Down Expand Up @@ -78,7 +79,7 @@ public enum ListenType
/// </summary>
[JsonProperty("event")]
public string Event => Core.Helpers.GetMappedToAttr(_listenType).Mapping!;

private readonly ListenType _listenType;

/// <summary>
Expand All @@ -97,4 +98,37 @@ public PostgresChangesOptions(string schema, string? table = null, ListenType ev
Filter = filter;
Parameters = parameters;
}

private bool Equals(PostgresChangesOptions other)
{
return _listenType == other._listenType && Schema == other.Schema && Table == other.Table && Filter == other.Filter;
}

/// <summary>
/// Check if object are equals
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public override bool Equals(object? obj)
{
if (obj is null) return false;
if (obj.GetType() != GetType()) return false;
return Equals((PostgresChangesOptions)obj);
}

/// <summary>
/// Generate hash code
/// </summary>
/// <returns></returns>
public override int GetHashCode()
{
unchecked
{
var hashCode = (int)_listenType;
hashCode = (hashCode * 397) ^ Schema.GetHashCode();
hashCode = (hashCode * 397) ^ (Table != null ? Table.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ (Filter != null ? Filter.GetHashCode() : 0);
return hashCode;
}
}
}
6 changes: 5 additions & 1 deletion Realtime/PostgresChanges/PostgresChangesResponse.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Newtonsoft.Json;
using System.Collections.Generic;
using Newtonsoft.Json;
using Supabase.Postgrest.Models;
using Supabase.Realtime.Socket;

Expand Down Expand Up @@ -73,4 +74,7 @@
/// </summary>
[JsonProperty("data")]
public SocketResponsePayload<T>? Data { get; set; }

[JsonProperty("ids")]
public List<int?> Ids { get; set; }

Check warning on line 79 in Realtime/PostgresChanges/PostgresChangesResponse.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Non-nullable property 'Ids' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check warning on line 79 in Realtime/PostgresChanges/PostgresChangesResponse.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'PostgresChangesPayload<T>.Ids'

Check warning on line 79 in Realtime/PostgresChanges/PostgresChangesResponse.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Non-nullable property 'Ids' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check warning on line 79 in Realtime/PostgresChanges/PostgresChangesResponse.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'PostgresChangesPayload<T>.Ids'
}
145 changes: 123 additions & 22 deletions Realtime/RealtimeChannel.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using System.Timers;
Expand Down Expand Up @@ -154,14 +155,13 @@ public class RealtimeChannel : IRealtimeChannel
private readonly List<MessageReceivedHandler> _messageReceivedHandlers = new();
private readonly List<ErrorEventHandler> _errorEventHandlers = new();

private readonly Dictionary<ListenType, List<PostgresChangesHandler>> _postgresChangesHandlers =
new();

private bool CanPush => IsJoined && Socket.IsConnected;
private bool _hasJoinedOnce;
private readonly Timer _rejoinTimer;
private bool _isRejoining;

private List<Binding> _bindings = [];

/// <summary>
/// Initializes a Channel - must call `Subscribe()` to receive events.
/// </summary>
Expand Down Expand Up @@ -330,11 +330,7 @@ private void NotifyMessageReceived(SocketResponse message)
/// <param name="postgresChangeHandler"></param>
public void AddPostgresChangeHandler(ListenType listenType, PostgresChangesHandler postgresChangeHandler)
{
if (!_postgresChangesHandlers.ContainsKey(listenType))
_postgresChangesHandlers[listenType] = new List<PostgresChangesHandler>();

if (!_postgresChangesHandlers[listenType].Contains(postgresChangeHandler))
_postgresChangesHandlers[listenType].Add(postgresChangeHandler);
BindPostgresChangesHandler(listenType, postgresChangeHandler);
}

/// <summary>
Expand All @@ -344,16 +340,16 @@ public void AddPostgresChangeHandler(ListenType listenType, PostgresChangesHandl
/// <param name="postgresChangeHandler"></param>
public void RemovePostgresChangeHandler(ListenType listenType, PostgresChangesHandler postgresChangeHandler)
{
if (_postgresChangesHandlers.ContainsKey(listenType) &&
_postgresChangesHandlers[listenType].Contains(postgresChangeHandler))
_postgresChangesHandlers[listenType].Remove(postgresChangeHandler);
RemovePostgresChangesFromBinding(listenType, postgresChangeHandler);
}

/// <summary>
/// Clears all postgres changes listeners.
/// </summary>
public void ClearPostgresChangeHandlers() =>
_postgresChangesHandlers.Clear();
public void ClearPostgresChangeHandlers()
{
_bindings.Clear();
}

/// <summary>
/// Adds an error event handler.
Expand Down Expand Up @@ -407,15 +403,7 @@ private void NotifyPostgresChanges(EventType eventType, PostgresChangesResponse
_ => ListenType.All
};

// Invoke the wildcard listener (but only once)
if (listenType != ListenType.All &&
_postgresChangesHandlers.TryGetValue(ListenType.All, out var changesHandler))
foreach (var handler in changesHandler.ToArray())
handler.Invoke(this, response);

if (_postgresChangesHandlers.TryGetValue(listenType, out var postgresChangesHandler))
foreach (var handler in postgresChangesHandler.ToArray())
handler.Invoke(this, response);
InvokeProperlyHandlerFromBind(listenType, response);
}

/// <summary>
Expand All @@ -428,6 +416,8 @@ private void NotifyPostgresChanges(EventType eventType, PostgresChangesResponse
public IRealtimeChannel Register(PostgresChangesOptions postgresChangesOptions)
{
PostgresChangesOptions.Add(postgresChangesOptions);

BindPostgresChangesOptions(postgresChangesOptions);
return this;
}

Expand Down Expand Up @@ -673,6 +663,8 @@ private void HandleJoinResponse(IRealtimePush<RealtimeChannel, SocketResponse> s
Options.SerializerSettings);
if (obj?.Payload == null) return;

obj.Payload.Response?.change?.ForEach(BindIdPostgresChanges);

switch (obj.Payload.Status)
{
// A response was received from the channel
Expand Down Expand Up @@ -764,4 +756,113 @@ internal void HandleSocketMessage(SocketResponse message)
break;
}
}

/// <summary>
/// Create a Binding and add to a list
/// </summary>
/// <param name="options"></param>
private void BindPostgresChangesOptions(PostgresChangesOptions options)
{
var founded = _bindings.FirstOrDefault(b => options.Equals(b.Options));
if (founded != null) return;

_bindings.Add(
new Binding
{
Options = options,
}
);
}

/// <summary>
/// Try to bind a PostgresChangesHandler to a PostgresChangesOptions
/// </summary>
/// <param name="listenType"></param>
/// <param name="handler"></param>
private void BindPostgresChangesHandler(ListenType listenType, PostgresChangesHandler handler)
{
var founded = _bindings.FirstOrDefault(b =>
b.Options?.Event == Core.Helpers.GetMappedToAttr(listenType).Mapping &&
b.Handler == null
);
if (founded != null)
{
founded.Handler = handler;
founded.ListenType = listenType;
return;
}

BindPostgresChangesHandlerGeneric(listenType, handler);

}

private void BindPostgresChangesHandlerGeneric(ListenType listenType, PostgresChangesHandler handler)
{
var founded = _bindings.FirstOrDefault(b =>
(b.Options?.Event == Core.Helpers.GetMappedToAttr(listenType).Mapping || b.Options?.Event == "*") &&
b.Handler == null
);
if (founded == null) return;

founded.Handler = handler;
founded.ListenType = listenType;
}

/// <summary>
/// Filter the binding list and try to add an id from socket to its binding
/// </summary>
/// <param name="joinResponse"></param>
private void BindIdPostgresChanges(PhoenixPostgresChangeResponse joinResponse)
{
var founded = _bindings.FirstOrDefault(b => b.Options != null &&
b.Options.Event == joinResponse.eventName &&
b.Options.Table == joinResponse.table &&
b.Options.Schema == joinResponse.schema &&
b.Options.Filter == joinResponse.filter);
if (founded == null) return;
founded.Id = joinResponse?.id;
}

/// <summary>
/// Try to invoke the handler properly based on event type and socket response
/// </summary>
/// <param name="eventType"></param>
/// <param name="response"></param>
private void InvokeProperlyHandlerFromBind(ListenType eventType, PostgresChangesResponse response)
{
var all = _bindings.FirstOrDefault(b =>
{
if (b.Options == null && response.Payload == null && b.Handler == null) return false;

return response.Payload != null && response.Payload.Ids.Contains(b.Id) && eventType != ListenType.All &&
b.ListenType == ListenType.All;
});

if (all != null)
{
all.Handler?.Invoke(this, response);
return;
}

// Invoke all specific handler if possible
_bindings.ForEach(binding =>
{
if (binding.ListenType != eventType) return;
if (binding.Options == null || response.Payload == null || binding.Handler == null) return;

if (response.Payload.Ids.Contains(binding.Id)) binding.Handler.Invoke(this, response);
});
}

/// <summary>
/// Remove handler from binding
/// </summary>
/// <param name="eventType"></param>
/// <param name="handler"></param>
private void RemovePostgresChangesFromBinding(ListenType eventType, PostgresChangesHandler handler)
{
var binding = _bindings.FirstOrDefault(b => b.Handler == handler && b.ListenType == eventType);
if (binding == null) return;
_bindings.Remove(binding);
}
}
2 changes: 2 additions & 0 deletions Realtime/RealtimeSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
Expand Down Expand Up @@ -88,6 +89,7 @@ public RealtimeSocket(string endpoint, ClientOptions options)
_connection = new WebsocketClient(new Uri(EndpointUrl), () =>
{
var socket = new ClientWebSocket();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Create("BROWSER"))) return socket;

foreach (var header in Headers)
socket.Options.SetRequestHeader(header.Key, header.Value);
Expand Down
21 changes: 21 additions & 0 deletions Realtime/Socket/Responses/PhoenixPostgresChangeResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Newtonsoft.Json;

namespace Supabase.Realtime.Socket.Responses;

public class PhoenixPostgresChangeResponse
{
[JsonProperty("id")]
public int? id { get; set; }

[JsonProperty("event")]
public string? eventName { get; set; }

[JsonProperty("filter")]
public string? filter { get; set; }

[JsonProperty("schema")]
public string? schema { get; set; }

[JsonProperty("table")]
public string? table { get; set; }
}
2 changes: 1 addition & 1 deletion Realtime/Socket/Responses/PhoenixResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class PhoenixResponse
/// The response.
/// </summary>
[JsonProperty("response")]
public object? Response;
public PostgresChangeResponse? Response;

/// <summary>
/// The status.
Expand Down
10 changes: 10 additions & 0 deletions Realtime/Socket/Responses/PostgresChangeResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Collections.Generic;
using Newtonsoft.Json;

namespace Supabase.Realtime.Socket.Responses;

public class PostgresChangeResponse
{
[JsonProperty("postgres_changes")]
public List<PhoenixPostgresChangeResponse> change { get; set; }

Check warning on line 9 in Realtime/Socket/Responses/PostgresChangeResponse.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Non-nullable property 'change' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check warning on line 9 in Realtime/Socket/Responses/PostgresChangeResponse.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Non-nullable property 'change' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.
}
Loading
Loading