forked from neuecc/UniRx
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Observable.Concurrency.cs
71 lines (60 loc) · 2.46 KB
/
Observable.Concurrency.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
using System;
using System.Collections.Generic;
using System.Text;
using UniRx.Operators;
namespace UniRx
{
public static partial class Observable
{
public static IObservable<T> Synchronize<T>(this IObservable<T> source)
{
return new SynchronizeObservable<T>(source, new object());
}
public static IObservable<T> Synchronize<T>(this IObservable<T> source, object gate)
{
return new SynchronizeObservable<T>(source, gate);
}
public static IObservable<T> ObserveOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return new ObserveOnObservable<T>(source, scheduler);
}
public static IObservable<T> SubscribeOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return new SubscribeOnObservable<T>(source, scheduler);
}
public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, TimeSpan dueTime)
{
return new DelaySubscriptionObservable<T>(source, dueTime, Scheduler.DefaultSchedulers.TimeBasedOperations);
}
public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler)
{
return new DelaySubscriptionObservable<T>(source, dueTime, scheduler);
}
public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, DateTimeOffset dueTime)
{
return new DelaySubscriptionObservable<T>(source, dueTime, Scheduler.DefaultSchedulers.TimeBasedOperations);
}
public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, DateTimeOffset dueTime, IScheduler scheduler)
{
return new DelaySubscriptionObservable<T>(source, dueTime, scheduler);
}
public static IObservable<T> Amb<T>(params IObservable<T>[] sources)
{
return Amb((IEnumerable<IObservable<T>>)sources);
}
public static IObservable<T> Amb<T>(IEnumerable<IObservable<T>> sources)
{
var result = Observable.Never<T>();
foreach (var item in sources)
{
var second = item;
result = result.Amb(second);
}
return result;
}
public static IObservable<T> Amb<T>(this IObservable<T> source, IObservable<T> second)
{
return new AmbObservable<T>(source, second);
}
}
}