Skip to content

Commit 40428ff

Browse files
feat: Optimize perf in burst or ctx updates (#85)
Suggestion to improve the performance from #84. In this PR, the queue to process state updates is "LastWins" policy and prevents execution of unnecessary intermediate states in the case of rapid calls to `setEvaluationContext`, for example Signed-off-by: Fabrizio Demaria <fabrizio.f.demaria@gmail.com>
1 parent 8cc95c9 commit 40428ff

File tree

2 files changed

+285
-12
lines changed

2 files changed

+285
-12
lines changed

Sources/OpenFeature/OpenFeatureAPI.swift

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,36 @@
11
import Combine
22
import Foundation
33

4-
/// Simple serial async task queue for serializing operations
5-
private actor AsyncSerialQueue {
6-
private var last: Task<Void, Never>?
7-
8-
/// Runs the given operation after previously enqueued work completes.
4+
/// Async queue that only executes the latest operation, cancelling pending ones
5+
/// This implements "last wins" semantics where intermediate operations that haven't
6+
/// started yet will be skipped in favor of the most recently queued operation.
7+
internal actor AsyncLastWinsQueue {
8+
private var currentTask: Task<Void, Never>?
9+
private var pendingOperation: (() async -> Void)?
10+
11+
/// Runs the given operation, but only the latest one will execute.
12+
/// Any pending operations that haven't started yet will be skipped.
913
func run(_ operation: @Sendable @escaping () async -> Void) async {
10-
let previous = last
11-
let task = Task {
12-
_ = await previous?.result
13-
await operation()
14+
// Store this as the pending operation
15+
pendingOperation = operation
16+
17+
// If there's already a task running, it will pick up this new operation when done
18+
if currentTask == nil {
19+
await executeNext()
20+
}
21+
}
22+
23+
private func executeNext() async {
24+
while let operation = pendingOperation {
25+
pendingOperation = nil // Clear pending before starting
26+
27+
let task = Task {
28+
await operation()
29+
}
30+
currentTask = task
31+
await task.value
32+
currentTask = nil
1433
}
15-
last = task
16-
await task.value
1734
}
1835
}
1936

@@ -22,7 +39,7 @@ private actor AsyncSerialQueue {
2239
public class OpenFeatureAPI {
2340
private let eventHandler = EventHandler()
2441
private let stateQueue = DispatchQueue(label: "com.openfeature.state.queue")
25-
private let atomicOperationsQueue = AsyncSerialQueue()
42+
private let atomicOperationsQueue = AsyncLastWinsQueue()
2643

2744
private(set) var providerSubject = CurrentValueSubject<FeatureProvider?, Never>(nil)
2845
private(set) var evaluationContext: EvaluationContext?
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import XCTest
2+
@testable import OpenFeature
3+
4+
class AsyncLastWinsQueueTests: XCTestCase {
5+
6+
// MARK: - Basic Behavior Tests
7+
8+
func testSingleOperationExecutes() async throws {
9+
let queue = AsyncLastWinsQueue()
10+
let executed = ActorBox<Bool>(false)
11+
12+
await queue.run {
13+
await executed.set(true)
14+
}
15+
16+
let result = await executed.get()
17+
XCTAssertTrue(result, "Single operation should execute")
18+
}
19+
20+
func testSequentialOperationsAllExecute() async throws {
21+
let queue = AsyncLastWinsQueue()
22+
let counter = ActorBox<Int>(0)
23+
24+
// Execute 3 operations sequentially (one at a time)
25+
await queue.run {
26+
await counter.increment()
27+
}
28+
29+
await queue.run {
30+
await counter.increment()
31+
}
32+
33+
await queue.run {
34+
await counter.increment()
35+
}
36+
37+
let result = await counter.get()
38+
XCTAssertEqual(result, 3, "All sequential operations should execute")
39+
}
40+
41+
// MARK: - Core "Last Wins" Tests
42+
43+
func testConcurrentOperationsSkipIntermediate() async throws {
44+
let queue = AsyncLastWinsQueue()
45+
let executionOrder = ActorBox<[Int]>([])
46+
let blockFirstOperation = ActorBox<Bool>(true)
47+
48+
// Start 5 operations concurrently
49+
// The first one will block, the middle ones should be skipped,
50+
// only the last one should execute after the first completes
51+
await withTaskGroup(of: Void.self) { group in
52+
for i in 0..<5 {
53+
group.addTask {
54+
await queue.run {
55+
// First operation blocks until we release it
56+
if i == 0 {
57+
while await blockFirstOperation.get() {
58+
try? await Task.sleep(nanoseconds: 10_000_000) // 10ms
59+
}
60+
}
61+
await executionOrder.append(i)
62+
}
63+
}
64+
}
65+
66+
// Give time for all operations to be queued
67+
try? await Task.sleep(nanoseconds: 50_000_000) // 50ms
68+
69+
// Release the first operation
70+
await blockFirstOperation.set(false)
71+
}
72+
73+
let order = await executionOrder.get()
74+
75+
// Should execute: operation 0 (first, was running) and operation 4 (last wins)
76+
XCTAssertEqual(order.count, 2, "Should only execute 2 operations: first and last")
77+
XCTAssertEqual(order[0], 0, "First operation should execute first")
78+
XCTAssertEqual(order[1], 4, "Last operation should execute second")
79+
}
80+
81+
func testRapidFireOnlyExecutesFirstAndLast() async throws {
82+
let queue = AsyncLastWinsQueue()
83+
let executed = ActorBox<Set<Int>>([])
84+
85+
await withTaskGroup(of: Void.self) { group in
86+
// Launch 100 operations that all try to start simultaneously
87+
for i in 0..<100 {
88+
group.addTask {
89+
await queue.run {
90+
// Simulate some work
91+
try? await Task.sleep(nanoseconds: 1_000_000) // 1ms
92+
await executed.insert(i)
93+
}
94+
}
95+
}
96+
}
97+
98+
let executedSet = await executed.get()
99+
100+
// Should execute much fewer than 100 operations
101+
XCTAssertLessThan(executedSet.count, 100, "Should skip many intermediate operations")
102+
103+
// First operation should execute (it started immediately)
104+
XCTAssertTrue(executedSet.contains(0), "First operation should execute")
105+
106+
// Last operation should execute (last wins)
107+
XCTAssertTrue(executedSet.contains(99), "Last operation should execute")
108+
109+
// Total executed should be small (first + maybe a few more + last)
110+
XCTAssertLessThan(executedSet.count, 10, "Should execute very few operations in rapid fire")
111+
}
112+
113+
// MARK: - Ordering and Consistency Tests
114+
115+
func testOperationsNeverRunConcurrently() async throws {
116+
let queue = AsyncLastWinsQueue()
117+
let concurrentExecutions = ActorBox<Int>(0)
118+
let maxConcurrent = ActorBox<Int>(0)
119+
let errors = ActorBox<[String]>([])
120+
121+
await withTaskGroup(of: Void.self) { group in
122+
for i in 0..<50 {
123+
group.addTask {
124+
await queue.run {
125+
let current = await concurrentExecutions.increment()
126+
127+
if current > 1 {
128+
await errors.append("Concurrent execution detected at operation \(i)")
129+
}
130+
131+
await maxConcurrent.updateMax(current)
132+
133+
// Simulate work
134+
try? await Task.sleep(nanoseconds: 5_000_000) // 5ms
135+
136+
await concurrentExecutions.decrement()
137+
}
138+
}
139+
}
140+
}
141+
142+
let max = await maxConcurrent.get()
143+
let errorList = await errors.get()
144+
145+
XCTAssertEqual(max, 1, "Should never have more than 1 concurrent execution")
146+
XCTAssertTrue(errorList.isEmpty, "Should have no concurrent execution errors: \(errorList)")
147+
}
148+
149+
func testFinalStateReflectsLastOperation() async throws {
150+
let queue = AsyncLastWinsQueue()
151+
let finalValue = ActorBox<String?>(nil)
152+
let slowOperationStarted = ActorBox<Bool>(false)
153+
let slowOperationCanProceed = ActorBox<Bool>(false)
154+
155+
await withTaskGroup(of: Void.self) { group in
156+
// Start a slow operation
157+
group.addTask {
158+
await queue.run {
159+
await slowOperationStarted.set(true)
160+
// Wait for signal
161+
while !(await slowOperationCanProceed.get()) {
162+
try? await Task.sleep(nanoseconds: 10_000_000)
163+
}
164+
await finalValue.set("slow")
165+
}
166+
}
167+
168+
// Wait for slow operation to start
169+
while !(await slowOperationStarted.get()) {
170+
try? await Task.sleep(nanoseconds: 10_000_000)
171+
}
172+
173+
// Queue multiple operations while slow one is running
174+
group.addTask {
175+
await queue.run {
176+
await finalValue.set("middle1")
177+
}
178+
}
179+
180+
group.addTask {
181+
await queue.run {
182+
await finalValue.set("middle2")
183+
}
184+
}
185+
186+
group.addTask {
187+
await queue.run {
188+
await finalValue.set("last")
189+
}
190+
}
191+
192+
// Give time for all to be queued
193+
try? await Task.sleep(nanoseconds: 50_000_000)
194+
195+
// Release slow operation
196+
await slowOperationCanProceed.set(true)
197+
}
198+
199+
let result = await finalValue.get()
200+
XCTAssertEqual(result, "last", "Final state should reflect the last queued operation")
201+
}
202+
}
203+
204+
// MARK: - Helper Actor for Thread-Safe State
205+
206+
actor ActorBox<T> {
207+
private var value: T
208+
209+
init(_ initialValue: T) {
210+
self.value = initialValue
211+
}
212+
213+
func get() -> T {
214+
return value
215+
}
216+
217+
func set(_ newValue: T) {
218+
self.value = newValue
219+
}
220+
}
221+
222+
extension ActorBox where T == Int {
223+
@discardableResult
224+
func increment() -> Int {
225+
value += 1
226+
return value
227+
}
228+
229+
func decrement() {
230+
value -= 1
231+
}
232+
233+
func updateMax(_ candidate: Int) {
234+
if candidate > value {
235+
value = candidate
236+
}
237+
}
238+
}
239+
240+
extension ActorBox where T == [Int] {
241+
func append(_ element: Int) {
242+
value.append(element)
243+
}
244+
}
245+
246+
extension ActorBox where T == [String] {
247+
func append(_ element: String) {
248+
value.append(element)
249+
}
250+
}
251+
252+
extension ActorBox where T == Set<Int> {
253+
func insert(_ element: Int) {
254+
value.insert(element)
255+
}
256+
}

0 commit comments

Comments
 (0)