-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathEventPublisher.swift
78 lines (68 loc) · 3.16 KB
/
EventPublisher.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//
// EventPublisher.swift
// IFTTT SDK
//
// Copyright © 2020 IFTTT. All rights reserved.
//
import Foundation
/// Defines an generic threadsafe publisher-subscriber construct allowing multiple subscribers to be
/// notified of events from the publisher.
final class EventPublisher<Type> {
/// A typealias to refer to the closure that's executed on each subscriber.
typealias SubscriberClosure = (Type) -> Void
/// A typealias to refer to a subscriber.
private typealias Subscriber = (TimeInterval, SubscriberClosure)
/// The `DispatchQueue` used to access the subscriberMap on.
private let subscriberMapDispatchQueue: DispatchQueue
/// The `DispatchQueue` used to publish events on
private let publisherDispatchQueue: DispatchQueue
// Don't access this Dictionary directly outside of `addSubscriber(...)` or `removeSubscriber(...)`. Maps aren't threadsafe and therefore we can't use the subscripting directly.
private var subscriberMap = [UUID: Subscriber]()
/// Creates an instance of `EventPublisher`.
///
/// - Parameters:
/// - queue: The queue to use in notifying subscribers of any published events.
init(subscriberMapDispatchQueue: DispatchQueue = DispatchQueue(label: "com.ifttt.subscriber_map.lock"),
publisherDispatchQueue: DispatchQueue = DispatchQueue(label: "com.ifttt.publisher", attributes: .concurrent)) {
self.subscriberMapDispatchQueue = subscriberMapDispatchQueue
self.publisherDispatchQueue = publisherDispatchQueue
}
/// Publishes an event to subscribers. Subscribers get notified of events in descending order of time they were added.
///
/// - Parameter:
/// - object: The event that is to be delivered to subscribers.
func onNext(_ object: Type) {
subscriberMapDispatchQueue.sync { [weak self] in
/// Sort the subscribers by time they were added. This means that the oldest subscriber gets notified of an event first.
self?.subscriberMap.values.sorted { $0.0 < $1.0 }
.map { $1 }
.forEach { closure in
self?.publisherDispatchQueue.async {
closure(object)
}
}
}
}
/// Adds a subscriber to be notified of events.
///
/// - Parameters:
/// - subscriber: A closure that gets called when there's a new published event.
/// - Returns: A `UUID` token that can be used to remove a subscriber using `removeSubscriber`.
@discardableResult
func addSubscriber(_ subscriber: @escaping SubscriberClosure) -> UUID {
return subscriberMapDispatchQueue.sync { [weak self] in
let id = UUID()
self?.subscriberMap[id] = (Date().timeIntervalSince1970, subscriber)
return id
}
}
/// Removes a subscriber from the subscriber map.
///
/// - Parameters:
/// - identifier: The token corresponding to the subscriber that is to be removed.
func removeSubscriber(_ identifier: UUID) {
subscriberMapDispatchQueue.sync { [weak self] in
self?.subscriberMap[identifier] = nil
}
}
}