Skip to content

Commit aa32432

Browse files
Fix threading issue in observable cache. Fixes reactivemarbles#538 (reactivemarbles#539)
Co-authored-by: Glenn <5834289+glennawatson@users.noreply.github.com>
1 parent 5ac9fb9 commit aa32432

File tree

1 file changed

+11
-16
lines changed

1 file changed

+11
-16
lines changed

src/DynamicData/Cache/ObservableCache.cs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -107,27 +107,22 @@ public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool>? predi
107107
Observable.Create<IChangeSet<TObject, TKey>>(
108108
observer =>
109109
{
110-
var initial = InternalEx.Return(() =>
110+
lock (_locker)
111111
{
112-
// lock getting initial changes and rely on a combination of Concat
113-
// + _changes being synchronized to produce thread safety (I hope!)
114-
lock (_locker)
112+
var initial = InternalEx.Return(() => (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate));
113+
var changes = initial.Concat(_changes);
114+
115+
if (predicate != null)
115116
{
116-
return (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate);
117+
changes = changes.Filter(predicate, suppressEmptyChangeSets);
118+
}
119+
else if (suppressEmptyChangeSets)
120+
{
121+
changes = changes.NotEmpty();
117122
}
118-
});
119123

120-
var changes = Observable.Defer(() => initial).Concat(_changes);
121-
if (predicate != null)
122-
{
123-
changes = changes.Filter(predicate, suppressEmptyChangeSets);
124-
}
125-
else if (suppressEmptyChangeSets)
126-
{
127-
changes = changes.NotEmpty();
124+
return changes.SubscribeSafe(observer);
128125
}
129-
130-
return changes.SubscribeSafe(observer);
131126
});
132127

133128
public void Dispose() => _cleanUp.Dispose();

0 commit comments

Comments
 (0)