Skip to content
This repository was archived by the owner on May 13, 2020. It is now read-only.

Commit ceb78af

Browse files
committed
Improve concurrent state mutation
1 parent 095edd9 commit ceb78af

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

RxReduce/Action.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ extension Array: Action where Element == Action {
3535
// MARK: - Default implementation of toAsync: Observable<Action> is also an Action
3636
extension Observable: Action where Element == Action {
3737
public func toAsync () -> Observable<Action> {
38-
return self.map { $0 as Action }
38+
return self
3939
}
4040
}

RxReduce/Store.swift

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@ public final class Store<State: Equatable> {
2020
private let stateSubject: BehaviorRelay<State>
2121
private var neededReducersPerSubState = [String: Int]()
2222
private var reducers = ContiguousArray<Reducer>()
23-
private let serialDispatchScheduler: SerialDispatchQueueScheduler = {
24-
let serialQueue = DispatchQueue(label: "com.rxswiftcommunity.rxreduce.serialqueue")
25-
return SerialDispatchQueueScheduler.init(queue: serialQueue, internalSerialQueueName: "com.rxswiftcommunity.rxreduce.serialscheduler")
26-
}()
23+
private let serialQueue = DispatchQueue(label: "com.rxswiftcommunity.rxreduce.serialqueue")
2724

2825
/// The global State is exposed via an Observable, just like some kind of "middleware".
2926
/// This global State will trigger a new value after a dispatch(action) has triggered a "onNext" event.
@@ -102,15 +99,18 @@ public final class Store<State: Equatable> {
10299
return action
103100
.toAsync()
104101
.map { [unowned self] (action) -> State in
105-
return self.reducers.reduce(self.stateSubject.value, { (currentState, reducer) -> State in
106-
return reducer(currentState, action)
107-
})
102+
103+
self.serialQueue.sync {
104+
let newState = self.reducers.reduce(self.stateSubject.value, { (currentState, reducer) -> State in
105+
return reducer(currentState, action)
106+
})
107+
108+
self.stateSubject.accept(newState)
109+
}
110+
111+
return self.stateSubject.value
108112
}
109-
.do(onNext: { [unowned self] (newState) in
110-
self.stateSubject.accept(newState)
111-
})
112113
.distinctUntilChanged()
113-
.subscribeOn(self.serialDispatchScheduler)
114114
}
115115

116116
/// Dispatches an Action to the registered Mutators but instead of

RxReduceTests/StoreTests.swift

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,61 @@ final class StoreTests: XCTestCase {
137137

138138
waitForExpectations(timeout: 1)
139139
}
140+
141+
func testStoreSchedulers () throws {
142+
// create 2 queues: 1 for the observeOn part of the Rx stream, and 1 for the subscriveOn part
143+
let subscribeOnScheduler: OperationQueueScheduler = {
144+
let queue = OperationQueue()
145+
queue.name = "SUBSCRIBE_ON_QUEUE"
146+
queue.maxConcurrentOperationCount = 1
147+
return OperationQueueScheduler(operationQueue: queue)
148+
}()
149+
let observeOnScheduler: OperationQueueScheduler = {
150+
let queue = OperationQueue()
151+
queue.name = "OBSERVE_ON_QUEUE"
152+
queue.maxConcurrentOperationCount = 1
153+
return OperationQueueScheduler(operationQueue: queue)
154+
}()
155+
156+
// Given
157+
let exp = expectation(description: "Queue expectation")
158+
exp.expectedFulfillmentCount = 3
159+
let actionObservable = Observable<Action>.just (IncreaseAction(increment: 10))
160+
161+
// When
162+
actionObservable
163+
.map { action -> DecreaseAction in
164+
if let queueName = OperationQueue.current?.name!,
165+
queueName == "SUBSCRIBE_ON_QUEUE" {
166+
exp.fulfill()
167+
} else {
168+
XCTFail("Subscribe on Wrong Queue")
169+
}
170+
return DecreaseAction(decrement: 10)
171+
}
172+
.flatMap{ (action) -> Observable<CounterState> in
173+
174+
if let queueName = OperationQueue.current?.name!,
175+
queueName == "SUBSCRIBE_ON_QUEUE" {
176+
exp.fulfill()
177+
} else {
178+
XCTFail("Subscribe on Wrong Queue")
179+
}
180+
return self.store.dispatch(action: action, focusingOn: { $0.counterState })
181+
}
182+
.subscribeOn(subscribeOnScheduler)
183+
.observeOn(observeOnScheduler)
184+
.subscribe(onNext: { counterState in
185+
186+
if let queueName = OperationQueue.current?.name!,
187+
queueName == "OBSERVE_ON_QUEUE" {
188+
exp.fulfill()
189+
} else {
190+
XCTFail("Observe on Wrong Queue")
191+
}
192+
}).disposed(by: self.disposeBag)
193+
194+
// Then
195+
waitForExpectations(timeout: 1)
196+
}
140197
}

0 commit comments

Comments
 (0)