Skip to content

Commit 6d7c469

Browse files
authored
Refactor SQLiteFuture to allow it to be retried (#30)
1 parent 2ccc39c commit 6d7c469

9 files changed

Lines changed: 133 additions & 158 deletions

Package.resolved

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ let package = Package(
1515
.package(
1616
name: "CombineExtensions",
1717
url: "https://github.com/shareup/combine-extensions.git",
18-
from: "2.9.0"
18+
from: "3.1.0"
1919
),
2020
.package(
2121
name: "Synchronized",
@@ -32,6 +32,7 @@ let package = Package(
3232
.testTarget(
3333
name: "SQLiteTests",
3434
dependencies: [
35+
.product(name: "CombineExtensionsDynamic", package: "CombineExtensions"),
3536
.product(name: "CombineTestExtensions", package: "CombineExtensions"),
3637
"SQLite",
3738
]),

Sources/SQLite/SQLiteDatabase.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ extension SQLiteDatabase {
179179

180180
public func readPublisher<T: SQLiteTransformable>(
181181
_ sql: SQL,
182-
arguments: SQLiteArguments
182+
arguments: SQLiteArguments = [:]
183183
) -> AnyPublisher<[T], SQLiteError> {
184184
readPublisher(sql, arguments: arguments)
185185
.tryMap { try $0.map { try T.init(row: $0) } }
@@ -547,6 +547,7 @@ extension SQLiteDatabase {
547547

548548
private func prepare(_ sql: SQL) throws -> SQLiteStatement {
549549
precondition(SQLiteQueue.isCurrentQueue)
550+
guard _isOpen else { throw SQLiteError.databaseIsClosed }
550551
return try SQLiteStatement.prepare(sql, in: self)
551552
}
552553
}

Sources/SQLite/SQLiteError.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ extension SQLiteError {
8989
guard let code = code else { return false }
9090
return code == SQLITE_BUSY
9191
}
92+
93+
public var isClosed: Bool {
94+
guard case .databaseIsClosed = self else { return false }
95+
return true
96+
}
9297
}
9398

9499
extension SQLiteError: CustomStringConvertible {

Sources/SQLite/SQLiteFuture.swift

Lines changed: 63 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -2,145 +2,53 @@ import Foundation
22
import Combine
33
import Synchronized
44

5-
class SQLiteFuture<Output>: Publisher {
5+
struct SQLiteFuture<Output>: Publisher {
66
typealias Failure = SQLiteError
77

88
typealias Promise = (Result<Output, Failure>) -> Void
99

10-
private enum State {
11-
case notStarted((@escaping SQLiteFuture.Promise) -> Void)
12-
case inProgress
13-
case failed(SQLiteError)
14-
case finished(Output)
15-
16-
var result: Result<Output, SQLiteError>? {
17-
switch self {
18-
case .notStarted, .inProgress:
19-
return nil
20-
21-
case let .failed(error):
22-
return .failure(error)
23-
24-
case let .finished(output):
25-
return .success(output)
26-
}
27-
}
28-
}
29-
30-
private var state: State
31-
private var subscriptions = Set<SQLiteFutureSubscription<Output>>()
32-
private var lock = RecursiveLock()
10+
private let attemptToFulfill: (@escaping Promise) -> Void
3311

3412
init(
3513
_ attemptToFulfill: @escaping (@escaping Promise) -> Void
3614
) {
37-
self.state = .notStarted(attemptToFulfill)
15+
self.attemptToFulfill = attemptToFulfill
3816
}
3917

4018
func receive<S: Subscriber>(
4119
subscriber: S
4220
) where Failure == S.Failure, Output == S.Input {
4321
let subscription = SQLiteFutureSubscription(
44-
subscriber: subscriber,
45-
resultProvider: resultProvider
22+
attemptToFulfill: attemptToFulfill,
23+
subscriber: subscriber
4624
)
47-
lock.locked { let _ = subscriptions.insert(subscription) }
4825
subscriber.receive(subscription: subscription)
4926
}
27+
}
5028

51-
private func notifySubscribers() {
52-
lock.locked {
53-
guard let result = state.result else { return }
54-
55-
var notified = Set<SQLiteFutureSubscription<Output>>()
56-
for sub in subscriptions {
57-
if sub.sendResult(result) {
58-
notified.insert(sub)
59-
}
60-
}
61-
subscriptions.subtract(notified)
62-
}
63-
}
64-
65-
enum NextAction {
66-
case doNothing
67-
case attemptToFulfill((@escaping Promise) -> Void)
68-
case sendOutput(Output)
69-
case sendFailure(SQLiteError)
29+
private final class SQLiteFutureSubscription<Output, S: Subscriber>: Subscription
30+
where
31+
S.Input == Output,
32+
S.Failure == SQLiteError
33+
{
34+
private enum State {
35+
case pending
36+
case fulfilled(Result<S.Input, S.Failure>)
37+
case finished
7038
}
7139

72-
private var resultProvider: () -> Result<Output, SQLiteError>? {
73-
{ [weak self] in
74-
guard let self = self else { return nil }
75-
76-
let nextAction = self.lock.locked { () -> NextAction in
77-
switch self.state {
78-
case let .notStarted(attemptToFulfill):
79-
self.state = .inProgress
80-
return .attemptToFulfill(attemptToFulfill)
81-
82-
case .inProgress:
83-
return .doNothing
84-
85-
case let .finished(output):
86-
return .sendOutput(output)
87-
88-
case let .failed(error):
89-
return .sendFailure(error)
90-
}
91-
}
92-
93-
switch nextAction {
94-
case let .attemptToFulfill(attemptToFulfill):
95-
// We can't use `[weak self]` here or the block
96-
// will be deallocated immediately after invoked.
97-
// This should not create a retain cycle because
98-
// `self` does not retain the block.
99-
attemptToFulfill({ result in
100-
self.lock.locked {
101-
guard case .inProgress = self.state else { return }
102-
103-
switch result {
104-
case let .success(output):
105-
self.state = .finished(output)
106-
107-
case let .failure(error):
108-
self.state = .failed(error)
109-
}
110-
}
111-
112-
self.notifySubscribers()
113-
})
114-
return nil
115-
116-
case .doNothing:
117-
return nil
118-
119-
case let .sendOutput(output):
120-
return .success(output)
121-
122-
case let .sendFailure(error):
123-
return .failure(error)
124-
}
125-
}
126-
}
127-
}
40+
private var state: State = .pending
41+
private var hasDemand = false
12842

129-
private final class SQLiteFutureSubscription<Output>: Subscription, Hashable {
130-
let id: CombineIdentifier
43+
private var subscriber: S?
44+
private let lock = Lock()
13145

132-
private var hasDemand = false
133-
private var subscriber: AnySubscriber<Output, SQLiteError>?
134-
private let resultProvider: () -> Result<Output, SQLiteError>?
135-
private let lock = RecursiveLock()
136-
137-
init<S: Subscriber>(
138-
subscriber: S,
139-
resultProvider: @escaping () -> Result<Output, SQLiteError>?
140-
) where S.Input == Output, S.Failure == SQLiteError {
141-
self.id = subscriber.combineIdentifier
142-
self.subscriber = AnySubscriber(subscriber)
143-
self.resultProvider = resultProvider
46+
init(
47+
attemptToFulfill: (@escaping SQLiteFuture<Output>.Promise) -> Void,
48+
subscriber: S
49+
) {
50+
self.subscriber = subscriber
51+
attemptToFulfill({ result in self.fulfill(with: result) })
14452
}
14553

14654
deinit {
@@ -149,50 +57,54 @@ private final class SQLiteFutureSubscription<Output>: Subscription, Hashable {
14957

15058
func request(_ demand: Subscribers.Demand) {
15159
guard demand != .none else { return }
152-
lock.locked { self.hasDemand = true }
153-
guard let result = resultProvider() else { return }
154-
let _ = sendResult(result)
155-
}
15660

157-
func cancel() {
158-
lock.locked { subscriber = nil }
159-
}
160-
161-
func sendResult(_ result: Result<Output, SQLiteError>) -> Bool {
162-
typealias Sub = AnySubscriber<Output, SQLiteError>
61+
let subscriberAndResult: (S, Result<S.Input, S.Failure>)? = lock.locked {
62+
hasDemand = true
63+
guard let sub = subscriber, case let .fulfilled(result) = state
64+
else { return nil }
65+
state = .finished
66+
subscriber = nil
67+
return (sub, result)
68+
}
16369

164-
let (didFinish, sub): (Bool, Sub?) = lock.locked {
165-
// If we don't have any demand, we return `false` so that
166-
// `SQLiteFuture` holds on to this subscription until it
167-
// receives some demand and calls `resultProvider()`.
168-
guard hasDemand else { return (false, nil) }
70+
guard let (subscriber, result) = subscriberAndResult else { return }
71+
notify(subscriber: subscriber, result: result)
72+
}
16973

170-
// If we don't have a subscriber, we want to be removed from
171-
// `SQLiteFuture`'s set of subscriptions. So, we need to
172-
// return `true`.
173-
guard let sub = subscriber else { return (true, nil) }
74+
func cancel() {
75+
lock.locked {
17476
subscriber = nil
175-
176-
return (true, sub)
77+
state = .finished
17778
}
79+
}
17880

179-
switch result {
180-
case let .success(output):
181-
let _ = sub?.receive(output)
182-
sub?.receive(completion: .finished)
183-
184-
case let .failure(error):
185-
sub?.receive(completion: .failure(error))
81+
private func fulfill(with result: Result<S.Input, S.Failure>) {
82+
let _subscriber: S? = lock.locked {
83+
guard case .pending = state, let sub = subscriber
84+
else { return nil }
85+
86+
if hasDemand {
87+
state = .finished
88+
subscriber = nil
89+
return sub
90+
} else {
91+
state = .fulfilled(result)
92+
return nil
93+
}
18694
}
18795

188-
return didFinish
96+
guard let subscriber = _subscriber else { return }
97+
notify(subscriber: subscriber, result: result)
18998
}
19099

191-
func hash(into hasher: inout Hasher) {
192-
hasher.combine(id)
193-
}
100+
private func notify(subscriber: S, result: Result<S.Input, S.Failure>) {
101+
switch result {
102+
case let .success(rows):
103+
_ = subscriber.receive(rows)
104+
subscriber.receive(completion: .finished)
194105

195-
static func == (lhs: SQLiteFutureSubscription, rhs: SQLiteFutureSubscription) -> Bool {
196-
lhs.id == rhs.id
106+
case let .failure(error):
107+
subscriber.receive(completion: .failure(error))
108+
}
197109
}
198110
}

Sources/SQLite/SQLiteStatementResultsPublisher.swift renamed to Sources/SQLite/SQLiteResultsPublisher.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ extension Publisher where Output == SQLiteDatabaseChange, Failure == Never {
88
sql: SQL,
99
arguments: SQLiteArguments = [:],
1010
database: SQLiteDatabase
11-
) -> SQLiteStatementResultsPublisher<Self> {
12-
SQLiteStatementResultsPublisher(
11+
) -> SQLiteResultsPublisher<Self> {
12+
SQLiteResultsPublisher(
1313
upstream: self,
1414
sql: sql,
1515
arguments: arguments,
@@ -18,7 +18,7 @@ extension Publisher where Output == SQLiteDatabaseChange, Failure == Never {
1818
}
1919
}
2020

21-
struct SQLiteStatementResultsPublisher<Upstream: Publisher>: Publisher where
21+
struct SQLiteResultsPublisher<Upstream: Publisher>: Publisher where
2222
Upstream.Output == SQLiteDatabaseChange, Upstream.Failure == Never
2323
{
2424
typealias Output = [SQLiteRow]

Tests/SQLiteTests/SQLiteCrossProcessMonitorTests.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class SQLiteCrossProcessMonitorTests: XCTestCase {
2020
let db = try SQLiteDatabase.makeShared(path: dbPath)
2121
try db.write(createTable)
2222
XCTAssertEqual(["test"], try db.tables())
23+
try db.close()
2324
}
2425
}
2526

@@ -48,6 +49,9 @@ class SQLiteCrossProcessMonitorTests: XCTestCase {
4849
)
4950

5051
wait(for: [ex1, ex2], timeout: 4) // Coordinated writes can be very slow
52+
53+
try db1.close()
54+
try db2.close()
5155
}
5256
}
5357

@@ -90,6 +94,8 @@ class SQLiteCrossProcessMonitorTests: XCTestCase {
9094

9195
wait(for: [outputEx], timeout: 2)
9296
wait(for: [duplicateEx], timeout: 2) // Coordinated writes can be very slow
97+
98+
try db.close()
9399
}
94100
}
95101
}

Tests/SQLiteTests/SQLiteDatabaseTests.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ class SQLiteDatabaseTests: XCTestCase {
7777
XCTAssertEqual(2, rows.count)
7878
XCTAssertEqual(one, rows.first)
7979
XCTAssertEqual(three, rows.last)
80+
81+
try db.close()
8082
}
8183
}
8284

0 commit comments

Comments
 (0)