Skip to content

Commit 765acaa

Browse files
authored
Add Combine publishers for reading, writing, and executing queries in a transaction (#23)
1 parent 154d6f9 commit 765acaa

6 files changed

Lines changed: 500 additions & 14 deletions

File tree

Package.resolved

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,16 @@ let package = Package(
1212
targets: ["SQLite"]),
1313
],
1414
dependencies: [
15-
.package(name: "Synchronized", url: "https://github.com/shareup/synchronized.git", from: "2.1.0")
15+
.package(
16+
name: "Synchronized",
17+
url: "https://github.com/shareup/synchronized.git",
18+
from: "2.2.0"
19+
),
20+
.package(
21+
name: "CombineExtensions",
22+
url: "https://github.com/shareup/combine-extensions.git",
23+
from: "2.3.0"
24+
)
1625
],
1726
targets: [
1827
.target(
@@ -23,6 +32,7 @@ let package = Package(
2332
.testTarget(
2433
name: "SQLiteTests",
2534
dependencies: [
35+
.product(name: "CombineTestExtensions", package: "CombineExtensions"),
2636
"SQLite",
2737
]
2838
),

Sources/SQLite/SQLiteDatabase.swift

Lines changed: 165 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@ public final class SQLiteDatabase {
3030
private var _isOpen: Bool
3131

3232
private lazy var _queue: DispatchQueue = {
33-
let queue = DispatchQueue(label: "Database._queue")
33+
let queue = DispatchQueue(
34+
label: "app.shareup.sqlite.sqlitedatabase.queue",
35+
qos: .default,
36+
attributes: [],
37+
autoreleaseFrequency: .workItem,
38+
target: .global()
39+
)
3440
queue.setSpecific(key: self._queueKey, value: self._queueContext)
3541
return queue
3642
}()
@@ -61,7 +67,103 @@ public final class SQLiteDatabase {
6167
SQLiteDatabase.close(_connection)
6268
}
6369
}
70+
}
71+
72+
// MARK: -
73+
// MARK: Asynchronous queries
74+
75+
extension SQLiteDatabase {
76+
public func inTransactionPublisher<T>(
77+
_ block: @escaping (SQLiteDatabase) throws -> T
78+
) -> AnyPublisher<T, SQLiteError> {
79+
SQLiteFuture { [_queue] (promise) in
80+
_queue.async { [weak self] in
81+
guard let self = self else {
82+
promise(.failure(.onExecuteQueryAfterDeallocating))
83+
return
84+
}
85+
86+
do {
87+
let result = try self.inTransaction(block)
88+
promise(.success(result))
89+
} catch let error as SQLiteError {
90+
promise(.failure(error))
91+
} catch {
92+
promise(.failure(.onInternalError(error.localizedDescription)))
93+
}
94+
}
95+
}
96+
.eraseToAnyPublisher()
97+
}
98+
99+
public func writePublisher(
100+
_ sql: SQL,
101+
arguments: SQLiteArguments = [:]
102+
) -> AnyPublisher<Void, SQLiteError> {
103+
let prepareStatement = { [unowned self] () throws -> OpaquePointer in
104+
try self.cachedStatement(for: sql)
105+
}
106+
107+
let resetStatement = { (statement: OpaquePointer) -> Void in
108+
statement.resetAndClearBindings()
109+
}
64110

111+
return _executeAsync(
112+
sql,
113+
arguments: arguments,
114+
prepareStatement: prepareStatement,
115+
resetStatement: resetStatement
116+
)
117+
.tryMap { rows in throw SQLiteError.onWrite(rows) }
118+
.mapError { (error) -> SQLiteError in
119+
if let sqliteError = error as? SQLiteError {
120+
return sqliteError
121+
} else {
122+
return .onInternalError(error.localizedDescription)
123+
}
124+
}
125+
.eraseToAnyPublisher()
126+
}
127+
128+
public func readPublisher(_ sql: SQL, arguments: SQLiteArguments = [:]) -> AnyPublisher<[SQLiteRow], SQLiteError> {
129+
let prepareStatement = { [unowned self] () throws -> OpaquePointer in
130+
try self.cachedStatement(for: sql)
131+
}
132+
133+
let resetStatement = { (statement: OpaquePointer) -> Void in
134+
statement.resetAndClearBindings()
135+
}
136+
137+
return _executeAsync(
138+
sql,
139+
arguments: arguments,
140+
prepareStatement: prepareStatement,
141+
resetStatement: resetStatement
142+
)
143+
.eraseToAnyPublisher()
144+
}
145+
146+
public func readPublisher<T: SQLiteTransformable>(
147+
_ sql: SQL,
148+
arguments: SQLiteArguments
149+
) -> AnyPublisher<[T], SQLiteError> {
150+
readPublisher(sql, arguments: arguments)
151+
.tryMap { try $0.map { try T.init(row: $0) } }
152+
.mapError { (error) -> SQLiteError in
153+
if let sqliteError = error as? SQLiteError {
154+
return sqliteError
155+
} else {
156+
return .onInternalError(error.localizedDescription)
157+
}
158+
}
159+
.eraseToAnyPublisher()
160+
}
161+
}
162+
163+
// MARK: -
164+
// MARK: Synchronous queries
165+
166+
extension SQLiteDatabase {
65167
public func inTransaction<T>(_ block: (SQLiteDatabase) throws -> T) rethrows -> T {
66168
return try sync {
67169
_transactionCount += 1
@@ -125,6 +227,9 @@ public final class SQLiteDatabase {
125227
}
126228
}
127229

230+
// MARK: -
231+
// MARK: Tables and columns
232+
128233
extension SQLiteDatabase {
129234
public func tables() throws -> Array<String> {
130235
return try sync {
@@ -156,6 +261,9 @@ extension SQLiteDatabase {
156261
}
157262
}
158263

264+
// MARK: -
265+
// MARK: Combine Publishers observing SQL queries
266+
159267
extension SQLiteDatabase {
160268
public func publisher(
161269
_ sql: SQL,
@@ -191,6 +299,9 @@ extension SQLiteDatabase {
191299
}
192300
}
193301

302+
// MARK: -
303+
// MARK: Block-based observation of SQL queries
304+
194305
extension SQLiteDatabase {
195306
func observe(
196307
_ sql: SQL,
@@ -249,12 +360,18 @@ extension SQLiteDatabase {
249360
}
250361
}
251362

363+
// MARK: -
364+
// MARK: Equatable
365+
252366
extension SQLiteDatabase: Equatable {
253367
public static func == (lhs: SQLiteDatabase, rhs: SQLiteDatabase) -> Bool {
254368
return lhs._connection == rhs._connection
255369
}
256370
}
257371

372+
// MARK: -
373+
// MARK: Compile-time SQLite options
374+
258375
extension SQLiteDatabase {
259376
public var supportsJSON: Bool {
260377
return isCompileOptionEnabled("ENABLE_JSON1")
@@ -265,6 +382,9 @@ extension SQLiteDatabase {
265382
}
266383
}
267384

385+
// MARK: -
386+
// MARK: Vacuuming
387+
268388
extension SQLiteDatabase {
269389
public enum AutoVacuumMode: Int {
270390
case none = 0
@@ -313,6 +433,9 @@ extension SQLiteDatabase {
313433
}
314434
}
315435

436+
// MARK: -
437+
// MARK: SQLite hooks
438+
316439
extension SQLiteDatabase {
317440
func createUpdateHandler(_ block: @escaping (String) -> Void) {
318441
let updateBlock: UpdateHookCallback = { _, _, _, tableName, _ in
@@ -360,6 +483,9 @@ extension SQLiteDatabase {
360483
}
361484
}
362485

486+
// MARK: -
487+
// MARK: Private
488+
363489
extension SQLiteDatabase {
364490
private func _observe(
365491
_ sql: SQL,
@@ -389,10 +515,45 @@ extension SQLiteDatabase {
389515
}
390516

391517
extension SQLiteDatabase {
392-
private func _execute(_ sql: SQL, statement: OpaquePointer,
393-
arguments: SQLiteArguments) throws -> Array<SQLiteRow> {
518+
private func _executeAsync(
519+
_ sql: SQL,
520+
arguments: SQLiteArguments,
521+
prepareStatement: @escaping () throws -> OpaquePointer,
522+
resetStatement: @escaping (OpaquePointer) -> Void
523+
) -> SQLiteFuture<[SQLiteRow]> {
524+
SQLiteFuture { [_queue] (promise) in
525+
_queue.async { [weak self] in
526+
guard let self = self else {
527+
promise(.failure(.onExecuteQueryAfterDeallocating))
528+
return
529+
}
530+
531+
do {
532+
let statement = try prepareStatement()
533+
defer { resetStatement(statement) }
534+
535+
let result = try self._execute(
536+
sql,
537+
statement: statement,
538+
arguments: arguments
539+
)
540+
promise(.success(result))
541+
} catch let error as SQLiteError {
542+
promise(.failure(error))
543+
} catch {
544+
promise(.failure(.onInternalError(error.localizedDescription)))
545+
}
546+
}
547+
}
548+
}
549+
550+
private func _execute(
551+
_ sql: SQL,
552+
statement: OpaquePointer,
553+
arguments: SQLiteArguments
554+
) throws -> [SQLiteRow] {
394555
assert(isOnDatabaseQueue)
395-
guard _isOpen else { assertionFailure("Database is closed"); return [] }
556+
guard _isOpen else { throw SQLiteError.onExecuteQueryWithoutOpenDatabase }
396557

397558
try statement.bind(arguments: arguments)
398559
let (result, output) = try statement.evaluate()

Sources/SQLite/SQLiteError.swift

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Foundation
22
import SQLite3
33

4-
public enum SQLiteError: Error {
4+
public enum SQLiteError: Error, Equatable {
55
case onInternalError(String)
66
case onOpen(Int32, String)
77
case onClose(Int32)
@@ -11,6 +11,9 @@ public enum SQLiteError: Error {
1111
case onStep(Int32, String)
1212
case onWrite(Array<SQLiteRow>)
1313
case onGetColumnType(Int32)
14+
case onBeginTransactionAfterDeallocating
15+
case onExecuteQueryWithoutOpenDatabase
16+
case onExecuteQueryAfterDeallocating
1417
case onCreateFunction(String, Int32)
1518
case onRemoveFunction(String, Int32)
1619
case onGetColumnInTable(String)
@@ -44,12 +47,18 @@ extension SQLiteError: CustomStringConvertible {
4447
return "Could not get index for '\(parameterName)'"
4548
case .onBindParameter(let code, let index, let value):
4649
return "Could not bind \(value) to \(index): \(string(for: code))"
47-
case .onStep(let code, let sql):
48-
return "Could not execute SQL '\(sql)': \(string(for: code))"
50+
case .onBeginTransactionAfterDeallocating:
51+
return "Tried to begin a transaction after deallocating"
52+
case .onExecuteQueryWithoutOpenDatabase:
53+
return "Tried to execute a query without an open database connection"
54+
case .onExecuteQueryAfterDeallocating:
55+
return "Tried to execute a query after deallocating"
4956
case .onWrite(let result):
5057
return "Write returned results: '\(result)'"
5158
case .onGetColumnType(let type):
5259
return "Invalid column type: \(type)"
60+
case .onStep(let code, let sql):
61+
return "Could not execute SQL '\(sql)': \(string(for: code))"
5362
case .onCreateFunction(let name, let code):
5463
return "Could not create function '\(name)': \(string(for: code))"
5564
case .onRemoveFunction(let name, let code):

0 commit comments

Comments
 (0)