Skip to content

Commit 6597465

Browse files
feat: Performance in bursts of context updates (second attempt) (#86)
This PR wants to merge in #84, to improve the performance of the updateContext queue and avoid queue build-up by coalescing pending tasks and ensure last-one-wins operations. It might be a better alternative to #85: the latter was causing coalesced tasks to return immediately instead of waiting for the latest task to complete successfully, which in turn would make `setEvaluationContext` return before the context is actually updated and ready. --------- Signed-off-by: Fabrizio Demaria <fabrizio.f.demaria@gmail.com>
1 parent d46b251 commit 6597465

File tree

4 files changed

+772
-189
lines changed

4 files changed

+772
-189
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import Foundation
2+
3+
/// Simple serial async task queue that coalesces operations.
4+
/// Only the currently running task and at most one pending operation are kept.
5+
/// Intermediate operations are skipped to avoid queue buildup.
6+
internal actor AsyncCoalescingSerialQueue {
7+
private var currentTask: Task<Void, Never>?
8+
private var pendingOperation: (() async -> Void)?
9+
private var pendingContinuations: [CheckedContinuation<Void, Never>] = []
10+
private var operationCounter: Int = 0
11+
12+
/// Verbose mode controls whether debug logging is enabled
13+
private let verbose: Bool
14+
15+
/// Initialize the queue with optional verbose logging
16+
/// - Parameter verbose: If true, detailed debug logs will be printed with [ASQ] prefix
17+
init(verbose: Bool = false) {
18+
self.verbose = verbose
19+
}
20+
21+
/// Runs the given operation serially. If an operation is already running,
22+
/// this operation replaces any previously pending operation (which gets skipped).
23+
/// All callers whose operations were replaced will wait for the latest operation to complete.
24+
func run(_ operation: @Sendable @escaping () async -> Void) async {
25+
await withCheckedContinuation { continuation in
26+
operationCounter += 1
27+
let operationId = operationCounter
28+
29+
if verbose {
30+
print("[ASQ] 🔵 run() called - Operation #\(operationId)")
31+
print("[ASQ] ├─ currentTask == nil: \(currentTask == nil)")
32+
print("[ASQ] ├─ pendingOperation == nil (before): \(pendingOperation == nil)")
33+
print("[ASQ] ├─ pendingContinuations.count (before): \(pendingContinuations.count)")
34+
}
35+
36+
// Replace any pending operation with this new one
37+
let hadPendingOperation = pendingOperation != nil
38+
pendingOperation = operation
39+
pendingContinuations.append(continuation)
40+
41+
if verbose {
42+
if hadPendingOperation {
43+
print("[ASQ] ├─ ⚠️ REPLACED previous pending operation with Operation #\(operationId)")
44+
} else {
45+
print("[ASQ] ├─ ✓ Set Operation #\(operationId) as pending operation")
46+
}
47+
print("[ASQ] ├─ pendingContinuations.count (after): \(pendingContinuations.count)")
48+
}
49+
50+
// If nothing is currently running, start processing
51+
if currentTask == nil {
52+
if verbose {
53+
print("[ASQ] └─ ▶️ No task running, calling processNext() for Operation #\(operationId)")
54+
}
55+
processNext()
56+
} else {
57+
if verbose {
58+
print("[ASQ] └─ ⏸️ Task already running, Operation #\(operationId) will wait")
59+
}
60+
}
61+
}
62+
}
63+
64+
private func processNext() {
65+
if verbose {
66+
print("[ASQ] 🟢 processNext() called")
67+
print("[ASQ] ├─ pendingOperation == nil: \(pendingOperation == nil)")
68+
print("[ASQ] ├─ pendingContinuations.count: \(pendingContinuations.count)")
69+
}
70+
71+
guard let operation = pendingOperation else {
72+
// No pending work
73+
if verbose {
74+
print("[ASQ] ├─ ⛔ No pending operation, cleaning up")
75+
}
76+
currentTask = nil
77+
if verbose {
78+
print("[ASQ] └─ ✓ currentTask set to nil, queue is now idle")
79+
}
80+
return
81+
}
82+
83+
// Clear pending state and capture continuations
84+
pendingOperation = nil
85+
let continuations = pendingContinuations
86+
pendingContinuations = []
87+
88+
if verbose {
89+
print("[ASQ] ├─ ✓ Captured \(continuations.count) continuation(s) to resume")
90+
print("[ASQ] ├─ ✓ Cleared pendingOperation and pendingContinuations")
91+
print("[ASQ] └─ 🚀 Starting new Task to execute operation")
92+
}
93+
94+
// Start the task
95+
currentTask = Task { [weak self, verbose] in
96+
if verbose {
97+
print("[ASQ] 🔄 Task execution started")
98+
}
99+
await operation()
100+
if verbose {
101+
print("[ASQ] ✅ Task execution completed")
102+
}
103+
104+
// Resume all waiting callers
105+
if verbose {
106+
print("[ASQ] 📤 Resuming \(continuations.count) continuation(s)")
107+
}
108+
for (index, continuation) in continuations.enumerated() {
109+
if verbose {
110+
print("[ASQ] ├─ Resuming continuation #\(index + 1)")
111+
}
112+
continuation.resume()
113+
}
114+
if verbose {
115+
print("[ASQ] ✓ All continuations resumed")
116+
}
117+
118+
// Process next operation if any arrived while we were running
119+
if verbose {
120+
print("[ASQ] 🔁 Checking for next operation...")
121+
}
122+
await self?.processNext()
123+
}
124+
}
125+
}

Sources/OpenFeature/OpenFeatureAPI.swift

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,12 @@
11
import Combine
22
import Foundation
33

4-
/// Simple serial async task queue for serializing operations
5-
private actor AsyncSerialQueue {
6-
private var last: Task<Void, Never>?
7-
8-
/// Runs the given operation after previously enqueued work completes.
9-
func run(_ operation: @Sendable @escaping () async -> Void) async {
10-
let previous = last
11-
let task = Task {
12-
_ = await previous?.result
13-
await operation()
14-
}
15-
last = task
16-
await task.value
17-
}
18-
}
19-
204
/// A global singleton which holds base configuration for the OpenFeature library.
215
/// Configuration here will be shared across all ``Client``s.
226
public class OpenFeatureAPI {
237
private let eventHandler = EventHandler()
248
private let stateQueue = DispatchQueue(label: "com.openfeature.state.queue")
25-
private let atomicOperationsQueue = AsyncSerialQueue()
9+
private let contextQueue: AsyncCoalescingSerialQueue
2610

2711
private(set) var providerSubject = CurrentValueSubject<FeatureProvider?, Never>(nil)
2812
private(set) var evaluationContext: EvaluationContext?
@@ -33,6 +17,9 @@ public class OpenFeatureAPI {
3317
static public let shared = OpenFeatureAPI()
3418

3519
public init() {
20+
// Check for OPENFEATURE_ASQ_VERBOSE environment variable to enable verbose logging
21+
let verboseMode = ProcessInfo.processInfo.environment["OPENFEATURE_ASQ_VERBOSE"] != nil
22+
contextQueue = AsyncCoalescingSerialQueue(verbose: verboseMode)
3623
}
3724

3825
/**
@@ -152,7 +139,7 @@ public class OpenFeatureAPI {
152139
}
153140

154141
private func setProviderInternal(provider: FeatureProvider, initialContext: EvaluationContext? = nil) async {
155-
await atomicOperationsQueue.run { [self] in
142+
await contextQueue.run { [self] in
156143
// Set initial state atomically
157144
stateQueue.sync {
158145
self.providerStatus = .notReady
@@ -189,7 +176,7 @@ public class OpenFeatureAPI {
189176
}
190177

191178
private func updateContext(evaluationContext: EvaluationContext) async {
192-
await atomicOperationsQueue.run { [self] in
179+
await contextQueue.run { [self] in
193180
// Get old context and set new context atomically
194181
let (oldContext, provider) = stateQueue.sync { () -> (EvaluationContext?, FeatureProvider?) in
195182
let oldContext = self.evaluationContext

0 commit comments

Comments
 (0)