Skip to content

Commit 6aa4bf5

Browse files
authored
Merge pull request #196 from GetStream/feature/uni-91-filter-out-shadowed-messages-in-the-channel
Implement hiding shadowed messages in the IStreamChannel messages and…
2 parents d4d8cf4 + 5eded51 commit 6aa4bf5

File tree

10 files changed

+224
-32
lines changed

10 files changed

+224
-32
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
using System;
2+
using System.Collections;
3+
using System.Collections.Generic;
4+
5+
namespace StreamChat.Core.Helpers
6+
{
7+
/// <summary>
8+
/// Filtered list where items can only be added if they pass the filter.
9+
/// </summary>
10+
/// <typeparam name="T"></typeparam>
11+
internal class FilteredList<T> : IList<T>, ICollection<T>, IEnumerable<T>, IReadOnlyList<T>, IReadOnlyCollection<T>
12+
{
13+
public int Count => _internalList.Count;
14+
public bool IsReadOnly => false;
15+
16+
public FilteredList(Predicate<T> addFilter)
17+
{
18+
_addFilter = addFilter ?? throw new ArgumentNullException(nameof(addFilter));
19+
}
20+
21+
public IEnumerator<T> GetEnumerator() => _internalList.GetEnumerator();
22+
23+
public void Add(T item)
24+
{
25+
if (_addFilter(item))
26+
{
27+
_internalList.Add(item);
28+
}
29+
else
30+
{
31+
throw new InvalidOperationException("Item does not pass the filter.");
32+
}
33+
}
34+
35+
public void Clear() => _internalList.Clear();
36+
37+
public bool Contains(T item) => _internalList.Contains(item);
38+
39+
public void CopyTo(T[] array, int arrayIndex) => _internalList.CopyTo(array, arrayIndex);
40+
41+
public bool Remove(T item) => _internalList.Remove(item);
42+
43+
public int IndexOf(T item) => _internalList.IndexOf(item);
44+
45+
public void Insert(int index, T item)
46+
{
47+
if (_addFilter(item))
48+
{
49+
_internalList.Insert(index, item);
50+
}
51+
else
52+
{
53+
throw new InvalidOperationException("Item does not pass the filter.");
54+
}
55+
}
56+
57+
public void RemoveAt(int index) => _internalList.RemoveAt(index);
58+
59+
public T this[int index]
60+
{
61+
get => _internalList[index];
62+
set
63+
{
64+
if (!_addFilter(value))
65+
{
66+
throw new InvalidOperationException("Item does not pass the filter.");
67+
}
68+
_internalList[index] = value;
69+
}
70+
}
71+
72+
public void Sort(IComparer<T> comparer) => _internalList.Sort(comparer);
73+
74+
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
75+
76+
private readonly Predicate<T> _addFilter;
77+
private readonly List<T> _internalList = new List<T>();
78+
}
79+
}

Assets/Plugins/StreamChat/Core/Helpers/FilteredList.cs.meta

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Assets/Plugins/StreamChat/Core/Helpers/ICollectionExt.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,23 @@ public static bool ContainsNoAlloc<TItem>(this List<TItem> source, TItem item)
2828
return false;
2929
}
3030

31+
/// <summary>
32+
/// In Unity 2019.4.40f1 List.Contains allocates memory. Use this allocation free alternative
33+
/// </summary>
34+
[Pure]
35+
public static bool ContainsNoAlloc<TItem>(this IList<TItem> source, TItem item)
36+
{
37+
for (var i = 0; i < source.Count; i++)
38+
{
39+
if (EqualityComparer<TItem>.Default.Equals(source[i], item))
40+
{
41+
return true;
42+
}
43+
}
44+
45+
return false;
46+
}
47+
3148
[Pure]
3249
public static List<TDto> TrySaveToDtoCollection<TSource, TDto>(this List<TSource> source)
3350
where TSource : ISavableTo<TDto>

Assets/Plugins/StreamChat/Core/Helpers/ILoadableFromExt.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace StreamChat.Core.Helpers
88
/// </summary>
99
internal static class ILoadableFromExt
1010
{
11+
//StreamTOdo: rename to TryCreateOrLoadFromDto
1112
/// <summary>
1213
/// Load domain object from the DTO. If the loadable is null, creates a new instance of the domain object.
1314
/// </summary>

Assets/Plugins/StreamChat/Core/State/IRepositoryExt.cs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,21 @@ public static void TryReplaceTrackedObjects<TTracked, TDto>(this IList<TTracked>
2828
foreach (var dto in dtos)
2929
{
3030
var trackedItem = repository.CreateOrUpdate<TTracked, TDto>(dto, out _);
31-
target.Add(trackedItem);
31+
try
32+
{
33+
target.Add(trackedItem);
34+
}
35+
catch
36+
{
37+
}
3238
}
3339
}
3440

3541
/// <summary>
36-
/// Clear target list and replace with items created or updated from DTO collection
42+
/// Append target list with items created from DTO enumeration. Fails silently if target or source are null.
3743
/// </summary>
38-
public static void TryAppendUniqueTrackedObjects<TTracked, TDto>(this IList<TTracked> target, IEnumerable<TDto> dtos,
39-
ICacheRepository<TTracked> repository)
44+
public static void TryAppendUniqueTrackedObjects<TTracked, TDto>(this IList<TTracked> target,
45+
IEnumerable<TDto> dtos, ICacheRepository<TTracked> repository)
4046
where TTracked : class, IStreamStatefulModel, IUpdateableFrom<TDto, TTracked>
4147
{
4248
if (target == null)
@@ -49,6 +55,7 @@ public static void TryAppendUniqueTrackedObjects<TTracked, TDto>(this IList<TTra
4955
return;
5056
}
5157

58+
//StreamTODO: change this to obtaining a hashset from a pool so (1) concurrent executions don't interfere (2) we use Hashset<TTracked> so there's no boxing
5259
_uniqueElements.Clear();
5360

5461
foreach (var t in target)
@@ -62,7 +69,13 @@ public static void TryAppendUniqueTrackedObjects<TTracked, TDto>(this IList<TTra
6269

6370
if (_uniqueElements.Add(trackedItem))
6471
{
65-
target.Add(trackedItem);
72+
try
73+
{
74+
target.Add(trackedItem);
75+
}
76+
catch
77+
{
78+
}
6679
}
6780
}
6881
}

Assets/Plugins/StreamChat/Core/StatefulModels/StreamChannel.cs

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public async Task<IStreamMessage> SendNewMessageAsync(StreamSendMessageRequest s
196196

197197
//StreamTodo: we update internal cache message without server confirmation that message got accepted. e.g. message could be rejected
198198
//It's ok to update the cache "in good faith" to not introduce update delay but we should handle if message got rejected
199-
var streamMessage = InternalAppendOrUpdateMessage(response.Message);
199+
InternalAppendOrUpdateMessage(response.Message, out var streamMessage);
200200
return streamMessage;
201201
}
202202

@@ -687,7 +687,7 @@ void IUpdateableFrom<UpdateChannelResponseInternalDTO, StreamChannel>.UpdateFrom
687687
internal void HandleMessageNewEvent(MessageNewEventInternalDTO dto)
688688
{
689689
AssertCid(dto.Cid);
690-
InternalAppendOrUpdateMessage(dto.Message);
690+
InternalAppendOrUpdateMessage(dto.Message, out _);
691691

692692
//StreamTodo: how can user react to this change? WatcherCount could internally fire WatchCountChanged event
693693
WatcherCount = GetOrDefault(dto.WatcherCount, WatcherCount);
@@ -696,7 +696,7 @@ internal void HandleMessageNewEvent(MessageNewEventInternalDTO dto)
696696
internal void InternalHandleMessageNewNotification(NotificationNewMessageEventInternalDTO dto)
697697
{
698698
AssertCid(dto.Cid);
699-
InternalAppendOrUpdateMessage(dto.Message);
699+
InternalAppendOrUpdateMessage(dto.Message, out _);
700700

701701
MemberCount = dto.ChannelMemberCount;
702702
}
@@ -806,15 +806,28 @@ protected override string InternalUniqueId
806806
}
807807

808808
private readonly List<StreamChannelMember> _members = new List<StreamChannelMember>();
809-
private readonly List<StreamMessage> _messages = new List<StreamMessage>();
810-
private readonly List<StreamMessage> _pinnedMessages = new List<StreamMessage>();
809+
private readonly FilteredList<StreamMessage> _messages = new FilteredList<StreamMessage>(MessageFilter);
810+
private readonly FilteredList<StreamMessage> _pinnedMessages = new FilteredList<StreamMessage>(MessageFilter);
811811
private readonly List<StreamUser> _watchers = new List<StreamUser>();
812812
private readonly List<StreamRead> _read = new List<StreamRead>();
813813
private readonly List<string> _ownCapabilities = new List<string>();
814814
private readonly List<StreamPendingMessage> _pendingMessages = new List<StreamPendingMessage>();
815+
816+
private readonly MessageCreateAtComparer _messageCreateAtComparer = new MessageCreateAtComparer();
815817

816818
private bool _muted;
817819
private bool _hidden;
820+
821+
//StreamTodo: move outside and change to internal
822+
private class MessageCreateAtComparer : IComparer<IStreamMessage>
823+
{
824+
public int Compare(IStreamMessage x, IStreamMessage y)
825+
{
826+
return x.CreatedAt.CompareTo(y.CreatedAt);
827+
}
828+
}
829+
830+
private static bool MessageFilter(IStreamMessage message) => !message.Shadowed.HasValue || !message.Shadowed.Value;
818831

819832
private void AssertCid(string cid)
820833
{
@@ -824,42 +837,38 @@ private void AssertCid(string cid)
824837
}
825838
}
826839

827-
private StreamMessage InternalAppendOrUpdateMessage(MessageInternalDTO dto)
840+
private bool InternalAppendOrUpdateMessage(MessageInternalDTO dto, out StreamMessage streamMessage)
828841
{
829-
var streamMessage = Cache.TryCreateOrUpdate(dto, out var wasCreated);
842+
streamMessage = Cache.TryCreateOrUpdate(dto, out var wasCreated);
830843
if (wasCreated)
831844
{
832845
if (!_messages.ContainsNoAlloc(streamMessage))
833846
{
834847
var lastMessage = _messages.LastOrDefault();
835848

836-
_messages.Add(streamMessage);
849+
try
850+
{
851+
_messages.Add(streamMessage);
852+
}
853+
catch
854+
{
855+
streamMessage = null;
856+
return false;
857+
}
837858

838859
// If local user sends message during the sync operation.
839860
// It is possible that the locally sent message will be added before the /sync endpoint returns past message events
840861
if (lastMessage != null && streamMessage.CreatedAt < lastMessage.CreatedAt)
841862
{
842-
//StreamTodo: test this more. A good way was to toggle Ethernet on PC and send messages on Android
863+
//StreamTodo: test this more. One way is to toggle Ethernet on PC and send messages from Android client
843864
_messages.Sort(_messageCreateAtComparer);
844865
}
845866

846867
MessageReceived?.Invoke(this, streamMessage);
847868
}
848869
}
849870

850-
return streamMessage;
851-
}
852-
853-
//StreamTodo: move this to the right place
854-
private MessageCreateAtComparer _messageCreateAtComparer = new MessageCreateAtComparer();
855-
856-
//StreamTodo: move outside and change to internal
857-
private class MessageCreateAtComparer : IComparer<IStreamMessage>
858-
{
859-
public int Compare(IStreamMessage x, IStreamMessage y)
860-
{
861-
return x.CreatedAt.CompareTo(y.CreatedAt);
862-
}
871+
return true;
863872
}
864873

865874
//StreamTodo: This deleteBeforeCreatedAt date is the date of event, it does not equal the passed TruncatedAt
@@ -890,7 +899,7 @@ private void InternalTruncateMessages(DateTimeOffset? deleteBeforeCreatedAt = nu
890899

891900
if (systemMessageDto != null)
892901
{
893-
InternalAppendOrUpdateMessage(systemMessageDto);
902+
InternalAppendOrUpdateMessage(systemMessageDto, out _);
894903
}
895904

896905
Truncated?.Invoke(this);
@@ -1096,10 +1105,7 @@ private Task InternalBanUserAsync(IStreamUser user, bool isShadowBan = false, st
10961105
});
10971106
}
10981107

1099-
private void SortMessagesByCreatedAt()
1100-
{
1101-
_messages.Sort((msg1, msg2) => msg1.CreatedAt.CompareTo(msg2.CreatedAt));
1102-
}
1108+
private void SortMessagesByCreatedAt() => _messages.Sort(_messageCreateAtComparer);
11031109

11041110
private UpdateChannelRequestInternalDTO GetUpdateRequestWithCurrentData()
11051111
=> new UpdateChannelRequestInternalDTO

Assets/Plugins/StreamChat/Core/StatefulModels/StreamMessage.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ internal sealed class StreamMessage : StreamStatefulModelBase<StreamMessage>,
7070

7171
public int? ReplyCount { get; private set; }
7272

73+
//StreamTODO: Channel filters out messages based of this field but this won't work if the message got updated
74+
//Channel needs to react to message.updated event and account for already present message becoming shadowed
7375
public bool? Shadowed { get; private set; }
7476

7577
public bool? ShowInChannel { get; private set; }
@@ -190,6 +192,8 @@ public Task MarkMessageAsLastReadAsync()
190192
});
191193
}
192194

195+
public override string ToString() => $"{nameof(IStreamMessage)}: {Text}, From: {User}";
196+
193197
void IUpdateableFrom<MessageInternalDTO, StreamMessage>.UpdateFromDto(MessageInternalDTO dto, ICache cache)
194198
{
195199
_attachments.TryReplaceRegularObjectsFromDto(dto.Attachments, cache);

Assets/Plugins/StreamChat/Libs/Websockets/WebsocketClient.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ await TryCloseAndDisposeAsync(WebSocketCloseStatus.NormalClosure,
9595
{
9696
await HandleConnectionFailedAsync(e);
9797
return;
98+
99+
//StreamTodo: failure should throw an exception. In our scenario this doesn't matter because the caller only logs a potential exception and handled failure based of the ConnectionFailed event only
98100
}
99101

100102
_backgroundSendTimer = new Timer(SendMessagesCallback, null, 0, UpdatePeriod);
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
using System.Collections;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using NUnit.Framework;
6+
using StreamChat.Core.LowLevelClient.Events;
7+
using UnityEngine.TestTools;
8+
9+
#if STREAM_TESTS_ENABLED
10+
namespace StreamChat.Tests.StatefulClient
11+
{
12+
internal class ChannelsModerationTests : BaseStateIntegrationTests
13+
{
14+
// This test requires active AI moderation policy - we rely on message being shadowed by the AI moderation.
15+
// Set "Threat" AI filter to "shadow block"
16+
// Note: The moderation won't work if you have the `Skip Message Moderation` permission set for this role. Admins probably have this by default.
17+
[UnityTest]
18+
public IEnumerator When_client_sends_message_shadowed_by_ai_moderation_expect_other_client_to_not_receive_it()
19+
=> ConnectAndExecute(When_client_sends_message_shadowed_by_ai_moderation_expect_other_client_to_not_receive_it_Async);
20+
21+
private async Task When_client_sends_message_shadowed_by_ai_moderation_expect_other_client_to_not_receive_it_Async()
22+
{
23+
// Create channel
24+
var channel = await CreateUniqueTempChannelAsync();
25+
26+
var otherClient = await GetConnectedOtherClientAsync();
27+
28+
// Fetch channel on other client to get it loaded into state layer
29+
var otherClientChannel = await otherClient.GetOrCreateChannelWithIdAsync(channel.Type, channel.Id);
30+
31+
32+
Assert.AreEqual(channel.Cid, otherClientChannel.Cid);
33+
34+
var normalMessage = await channel.SendNewMessageAsync("normal message");
35+
36+
// Appropriate message should be present for both clients
37+
Assert.IsTrue(channel.Messages.Contains(normalMessage));
38+
39+
// Wait for other client to receive the message
40+
await WaitWhileFalseAsync(() => otherClientChannel.Messages.Any(m => m.Id == normalMessage.Id));
41+
42+
Assert.IsNotNull(otherClientChannel.Messages.Single(m => m.Id == normalMessage.Id));
43+
44+
// Setup waiting for the message on the other client
45+
var messagesReceivedOnOtherClient = new List<EventMessageNew>();
46+
otherClient.InternalLowLevelClient.MessageReceived += eventMessageNew =>
47+
{
48+
messagesReceivedOnOtherClient.Add(eventMessageNew);
49+
};
50+
51+
var offensiveMessage = await channel.SendNewMessageAsync("I shall kidnap your hamster!");
52+
53+
// Author should believe the message was sent
54+
Assert.IsTrue(channel.Messages.Contains(offensiveMessage));
55+
56+
// Wait for other client to receive the offensive message
57+
await WaitWhileFalseAsync(() => messagesReceivedOnOtherClient.Any(m => m.Message.Id == offensiveMessage.Id));
58+
59+
// Other client should have the offensive message shadowed
60+
Assert.IsFalse(otherClientChannel.Messages.Any(m => m.Id == offensiveMessage.Id));
61+
}
62+
}
63+
}
64+
#endif

Assets/Plugins/StreamChat/Tests/StatefulClient/ChannelsModerationTests.cs.meta

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)