11using System ;
22using System . Collections . Generic ;
3+ using System . Linq ;
34using System . Runtime . CompilerServices ;
45using System . Threading . Tasks ;
56using System . Timers ;
@@ -154,14 +155,13 @@ public class RealtimeChannel : IRealtimeChannel
154155 private readonly List < MessageReceivedHandler > _messageReceivedHandlers = new ( ) ;
155156 private readonly List < ErrorEventHandler > _errorEventHandlers = new ( ) ;
156157
157- private readonly Dictionary < ListenType , List < PostgresChangesHandler > > _postgresChangesHandlers =
158- new ( ) ;
159-
160158 private bool CanPush => IsJoined && Socket . IsConnected ;
161159 private bool _hasJoinedOnce ;
162160 private readonly Timer _rejoinTimer ;
163161 private bool _isRejoining ;
164162
163+ private List < Binding > _bindings = [ ] ;
164+
165165 /// <summary>
166166 /// Initializes a Channel - must call `Subscribe()` to receive events.
167167 /// </summary>
@@ -330,11 +330,7 @@ private void NotifyMessageReceived(SocketResponse message)
330330 /// <param name="postgresChangeHandler"></param>
331331 public void AddPostgresChangeHandler ( ListenType listenType , PostgresChangesHandler postgresChangeHandler )
332332 {
333- if ( ! _postgresChangesHandlers . ContainsKey ( listenType ) )
334- _postgresChangesHandlers [ listenType ] = new List < PostgresChangesHandler > ( ) ;
335-
336- if ( ! _postgresChangesHandlers [ listenType ] . Contains ( postgresChangeHandler ) )
337- _postgresChangesHandlers [ listenType ] . Add ( postgresChangeHandler ) ;
333+ BindPostgresChangesHandler ( listenType , postgresChangeHandler ) ;
338334 }
339335
340336 /// <summary>
@@ -344,16 +340,16 @@ public void AddPostgresChangeHandler(ListenType listenType, PostgresChangesHandl
344340 /// <param name="postgresChangeHandler"></param>
345341 public void RemovePostgresChangeHandler ( ListenType listenType , PostgresChangesHandler postgresChangeHandler )
346342 {
347- if ( _postgresChangesHandlers . ContainsKey ( listenType ) &&
348- _postgresChangesHandlers [ listenType ] . Contains ( postgresChangeHandler ) )
349- _postgresChangesHandlers [ listenType ] . Remove ( postgresChangeHandler ) ;
343+ RemovePostgresChangesFromBinding ( listenType , postgresChangeHandler ) ;
350344 }
351345
352346 /// <summary>
353347 /// Clears all postgres changes listeners.
354348 /// </summary>
355- public void ClearPostgresChangeHandlers ( ) =>
356- _postgresChangesHandlers . Clear ( ) ;
349+ public void ClearPostgresChangeHandlers ( )
350+ {
351+ _bindings . Clear ( ) ;
352+ }
357353
358354 /// <summary>
359355 /// Adds an error event handler.
@@ -407,15 +403,7 @@ private void NotifyPostgresChanges(EventType eventType, PostgresChangesResponse
407403 _ => ListenType . All
408404 } ;
409405
410- // Invoke the wildcard listener (but only once)
411- if ( listenType != ListenType . All &&
412- _postgresChangesHandlers . TryGetValue ( ListenType . All , out var changesHandler ) )
413- foreach ( var handler in changesHandler . ToArray ( ) )
414- handler . Invoke ( this , response ) ;
415-
416- if ( _postgresChangesHandlers . TryGetValue ( listenType , out var postgresChangesHandler ) )
417- foreach ( var handler in postgresChangesHandler . ToArray ( ) )
418- handler . Invoke ( this , response ) ;
406+ InvokeProperlyHandlerFromBind ( listenType , response ) ;
419407 }
420408
421409 /// <summary>
@@ -428,6 +416,8 @@ private void NotifyPostgresChanges(EventType eventType, PostgresChangesResponse
428416 public IRealtimeChannel Register ( PostgresChangesOptions postgresChangesOptions )
429417 {
430418 PostgresChangesOptions . Add ( postgresChangesOptions ) ;
419+
420+ BindPostgresChangesOptions ( postgresChangesOptions ) ;
431421 return this ;
432422 }
433423
@@ -673,6 +663,8 @@ private void HandleJoinResponse(IRealtimePush<RealtimeChannel, SocketResponse> s
673663 Options . SerializerSettings ) ;
674664 if ( obj ? . Payload == null ) return ;
675665
666+ obj . Payload . Response ? . change ? . ForEach ( BindIdPostgresChanges ) ;
667+
676668 switch ( obj . Payload . Status )
677669 {
678670 // A response was received from the channel
@@ -764,4 +756,113 @@ internal void HandleSocketMessage(SocketResponse message)
764756 break ;
765757 }
766758 }
759+
760+ /// <summary>
761+ /// Create a Binding and add to a list
762+ /// </summary>
763+ /// <param name="options"></param>
764+ private void BindPostgresChangesOptions ( PostgresChangesOptions options )
765+ {
766+ var founded = _bindings . FirstOrDefault ( b => options . Equals ( b . Options ) ) ;
767+ if ( founded != null ) return ;
768+
769+ _bindings . Add (
770+ new Binding
771+ {
772+ Options = options ,
773+ }
774+ ) ;
775+ }
776+
777+ /// <summary>
778+ /// Try to bind a PostgresChangesHandler to a PostgresChangesOptions
779+ /// </summary>
780+ /// <param name="listenType"></param>
781+ /// <param name="handler"></param>
782+ private void BindPostgresChangesHandler ( ListenType listenType , PostgresChangesHandler handler )
783+ {
784+ var founded = _bindings . FirstOrDefault ( b =>
785+ b . Options ? . Event == Core . Helpers . GetMappedToAttr ( listenType ) . Mapping &&
786+ b . Handler == null
787+ ) ;
788+ if ( founded != null )
789+ {
790+ founded . Handler = handler ;
791+ founded . ListenType = listenType ;
792+ return ;
793+ }
794+
795+ BindPostgresChangesHandlerGeneric ( listenType , handler ) ;
796+
797+ }
798+
799+ private void BindPostgresChangesHandlerGeneric ( ListenType listenType , PostgresChangesHandler handler )
800+ {
801+ var founded = _bindings . FirstOrDefault ( b =>
802+ ( b . Options ? . Event == Core . Helpers . GetMappedToAttr ( listenType ) . Mapping || b . Options ? . Event == "*" ) &&
803+ b . Handler == null
804+ ) ;
805+ if ( founded == null ) return ;
806+
807+ founded . Handler = handler ;
808+ founded . ListenType = listenType ;
809+ }
810+
811+ /// <summary>
812+ /// Filter the binding list and try to add an id from socket to its binding
813+ /// </summary>
814+ /// <param name="joinResponse"></param>
815+ private void BindIdPostgresChanges ( PhoenixPostgresChangeResponse joinResponse )
816+ {
817+ var founded = _bindings . FirstOrDefault ( b => b . Options != null &&
818+ b . Options . Event == joinResponse . eventName &&
819+ b . Options . Table == joinResponse . table &&
820+ b . Options . Schema == joinResponse . schema &&
821+ b . Options . Filter == joinResponse . filter ) ;
822+ if ( founded == null ) return ;
823+ founded . Id = joinResponse ? . id ;
824+ }
825+
826+ /// <summary>
827+ /// Try to invoke the handler properly based on event type and socket response
828+ /// </summary>
829+ /// <param name="eventType"></param>
830+ /// <param name="response"></param>
831+ private void InvokeProperlyHandlerFromBind ( ListenType eventType , PostgresChangesResponse response )
832+ {
833+ var all = _bindings . FirstOrDefault ( b =>
834+ {
835+ if ( b . Options == null && response . Payload == null && b . Handler == null ) return false ;
836+
837+ return response . Payload != null && response . Payload . Ids . Contains ( b . Id ) && eventType != ListenType . All &&
838+ b . ListenType == ListenType . All ;
839+ } ) ;
840+
841+ if ( all != null )
842+ {
843+ all . Handler ? . Invoke ( this , response ) ;
844+ return ;
845+ }
846+
847+ // Invoke all specific handler if possible
848+ _bindings . ForEach ( binding =>
849+ {
850+ if ( binding . ListenType != eventType ) return ;
851+ if ( binding . Options == null || response . Payload == null || binding . Handler == null ) return ;
852+
853+ if ( response . Payload . Ids . Contains ( binding . Id ) ) binding . Handler . Invoke ( this , response ) ;
854+ } ) ;
855+ }
856+
857+ /// <summary>
858+ /// Remove handler from binding
859+ /// </summary>
860+ /// <param name="eventType"></param>
861+ /// <param name="handler"></param>
862+ private void RemovePostgresChangesFromBinding ( ListenType eventType , PostgresChangesHandler handler )
863+ {
864+ var binding = _bindings . FirstOrDefault ( b => b . Handler == handler && b . ListenType == eventType ) ;
865+ if ( binding == null ) return ;
866+ _bindings . Remove ( binding ) ;
867+ }
767868}
0 commit comments