88
99#if canImport(Combine)
1010import Combine
11+ import class Foundation. NSRecursiveLock
1112
1213@available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
1314public extension Publisher {
@@ -20,8 +21,145 @@ public extension Publisher {
2021 /// - returns: A publisher emitting the values of all emitted publishers in order.
2122 func concatMap< T, P> (
2223 _ transform: @escaping ( Self . Output ) -> P
23- ) -> Publishers . FlatMap < P , Self > where T == P . Output , P: Publisher , Self. Failure == P . Failure {
24- flatMap ( maxPublishers: . max( 1 ) , transform)
24+ ) -> Publishers . ConcatMap < P , Self > where T == P . Output , P: Publisher , Self. Failure == P . Failure {
25+ return Publishers . ConcatMap ( upstream: self , transform: transform)
26+ }
27+ }
28+
29+ @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
30+ public extension Publishers {
31+ final class ConcatMap < NewPublisher, Upstream> : Publisher where NewPublisher: Publisher , Upstream: Publisher , NewPublisher. Failure == Upstream . Failure {
32+ public typealias Transform = ( Upstream . Output ) -> NewPublisher
33+ public typealias Output = NewPublisher . Output
34+ public typealias Failure = Upstream . Failure
35+
36+ public let upstream : Upstream
37+ public let transform : Transform
38+
39+ public init (
40+ upstream: Upstream ,
41+ transform: @escaping Transform
42+ ) {
43+ self . upstream = upstream
44+ self . transform = transform
45+ }
46+
47+ public func receive< S: Subscriber > ( subscriber: S )
48+ where Output == S . Input , Failure == S . Failure {
49+ subscriber. receive (
50+ subscription: Subscription (
51+ upstream: upstream,
52+ transform: transform,
53+ downstream: subscriber
54+ )
55+ )
56+ }
57+ }
58+ }
59+
60+ // MARK: - Subscription
61+ @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
62+ private extension Publishers . ConcatMap {
63+ final class Subscription < Downstream: Subscriber > : Combine . Subscription where Downstream. Input == Output , Downstream. Failure == Failure {
64+ private var sink : Sink < Downstream > ?
65+
66+ init (
67+ upstream: Upstream ,
68+ transform: @escaping Transform ,
69+ downstream: Downstream
70+ ) {
71+ self . sink = Sink (
72+ upstream: upstream,
73+ downstream: downstream,
74+ transform: { transform ( $0) }
75+ )
76+ }
77+
78+ func request( _ demand: Subscribers . Demand ) {
79+ sink? . demand ( demand)
80+ }
81+
82+ func cancel( ) {
83+ sink = nil
84+ }
85+ }
86+ }
87+
88+ // MARK: - Sink
89+ @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
90+ private extension Publishers . ConcatMap {
91+ final class Sink < Downstream: Subscriber > : CombineExt . Sink < Upstream , Downstream >
92+ where Downstream. Input == Output , Downstream. Failure == Failure {
93+ private let lock = NSRecursiveLock ( )
94+ private let transform : Transform
95+ private var activePublisher : NewPublisher ?
96+ private var bufferedPublishers : [ NewPublisher ]
97+ private var cancellables : Set < AnyCancellable >
98+
99+ init (
100+ upstream: Upstream ,
101+ downstream: Downstream ,
102+ transform: @escaping Transform
103+ ) {
104+ self . transform = transform
105+ self . bufferedPublishers = [ ]
106+ self . cancellables = [ ]
107+ super. init (
108+ upstream: upstream,
109+ downstream: downstream,
110+ transformFailure: { $0 }
111+ )
112+ }
113+
114+ override func receive( _ input: Upstream . Output ) -> Subscribers . Demand {
115+ let mapped = transform ( input)
116+
117+ lock. lock ( )
118+ if activePublisher == nil {
119+ lock. unlock ( )
120+ setActivePublisher ( mapped)
121+ } else {
122+ lock. unlock ( )
123+ bufferedPublishers. append ( mapped)
124+ }
125+
126+ return . unlimited
127+ }
128+
129+ private func setActivePublisher( _ publisher: NewPublisher ) {
130+ lock. lock ( )
131+ defer { lock. unlock ( ) }
132+ activePublisher = publisher
133+
134+ publisher. sink (
135+ receiveCompletion: { completion in
136+ switch completion {
137+ case . finished:
138+ self . lock. lock ( )
139+ guard let next = self . bufferedPublishers. first else {
140+ self . lock. unlock ( )
141+ return
142+ }
143+ self . bufferedPublishers. removeFirst ( )
144+ self . lock. unlock ( )
145+ self . setActivePublisher ( next)
146+ case . failure( let error) :
147+ self . receive ( completion: . failure( error) )
148+ }
149+ } ,
150+ receiveValue: { value in
151+ _ = self . buffer. buffer ( value: value)
152+ }
153+ )
154+ . store ( in: & cancellables)
155+ }
156+ }
157+ }
158+
159+ @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
160+ extension Publishers . ConcatMap . Subscription : CustomStringConvertible {
161+ var description : String {
162+ return " ConcatMap.Subscription< \( Downstream . Input. self) , \( Downstream . Failure. self) > "
25163 }
26164}
27165#endif
0 commit comments