diff --git a/CHANGELOG.md b/CHANGELOG.md index fb66b71..eef947f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-Beta.8 + +* Update underlying package which has a fix to avoid `watchQuery` race conditions + ## 1.0.0-Beta.7 * Fixed an issue where throwing exceptions in the query `mapper` could cause a runtime crash. diff --git a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index 03c7ebd..1867b01 100644 --- a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -15,8 +15,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/powersync-ja/powersync-kotlin.git", "state" : { - "revision" : "203db74889df8a20e3c6ac38aede6b0186d2e3b5", - "version" : "1.0.0-BETA23.0" + "revision" : "a683c43f9d432fdc9257626659c4da10f67ec8fb", + "version" : "1.0.0-BETA24.0" } }, { diff --git a/Demo/PowerSyncExample/Components/AddTodoListView.swift b/Demo/PowerSyncExample/Components/AddTodoListView.swift index 0f37a0b..4149ea8 100644 --- a/Demo/PowerSyncExample/Components/AddTodoListView.swift +++ b/Demo/PowerSyncExample/Components/AddTodoListView.swift @@ -3,25 +3,41 @@ import SwiftUI struct AddTodoListView: View { @Environment(SystemManager.self) private var system + @State private var isLoading = false @Binding var newTodo: NewTodo let listId: String let completion: (Result) -> Void - + var body: some View { Section { TextField("Description", text: $newTodo.description) - Button("Save") { - Task{ + + Button { + Task { + isLoading = true + defer { isLoading = false } + do { try await system.insertTodo(newTodo, listId) - await completion(.success(true)) + completion(.success(true)) } catch { - await completion(.failure(error)) + completion(.failure(error)) throw error } } + } label: { + HStack { + Text("Save") + if isLoading { + Spacer() + ProgressView() + .progressViewStyle(CircularProgressViewStyle()) + } + } + .frame(maxWidth: .infinity, alignment: .leading) } + .disabled(isLoading) } } } diff --git a/Demo/PowerSyncExample/Components/TodoListView.swift b/Demo/PowerSyncExample/Components/TodoListView.swift index b006cd0..d4d11aa 100644 --- a/Demo/PowerSyncExample/Components/TodoListView.swift +++ b/Demo/PowerSyncExample/Components/TodoListView.swift @@ -10,69 +10,140 @@ struct TodoListView: View { @State private var error: Error? @State private var newTodo: NewTodo? @State private var editing: Bool = false + @State private var isLoadingTodos: Bool = false + @State private var batchInsertProgress: Double? = nil var body: some View { - List { - if let error { - ErrorText(error) - } + ZStack { + List { + if let error { + ErrorText(error) + } - IfLet($newTodo) { $newTodo in - AddTodoListView(newTodo: $newTodo, listId: listId) { result in - withAnimation { - self.newTodo = nil + IfLet($newTodo) { $newTodo in + AddTodoListView(newTodo: $newTodo, listId: listId) { result in + withAnimation { + self.newTodo = nil + } } } - } - ForEach(todos) { todo in - TodoListRow(todo: todo) { - Task { - try await toggleCompletion(of: todo) + if let progress = batchInsertProgress { + Section { + VStack(alignment: .leading, spacing: 8) { + Text("Inserting todos...") + .font(.subheadline) + .foregroundColor(.secondary) + + ProgressView(value: progress) + .progressViewStyle(LinearProgressViewStyle()) + + Text("\(Int(progress * 100))% complete") + .font(.caption) + .foregroundColor(.secondary) + } + .padding(.vertical, 8) } } - } - .onDelete { indexSet in - Task { - await delete(at: indexSet) + + ForEach(todos) { todo in + TodoListRow(todo: todo) { + Task { + try await toggleCompletion(of: todo) + } + } + } + .onDelete { indexSet in + Task { + await delete(at: indexSet) + } } } - } - .animation(.default, value: todos) - .navigationTitle("Todos") - .toolbar { - ToolbarItem(placement: .primaryAction) { - if (newTodo == nil) { - Button { - withAnimation { - newTodo = .init( - listId: listId, - isComplete: false, - description: "" - ) + .animation(.default, value: todos) + .animation(.default, value: batchInsertProgress) + .navigationTitle("Todos") + .toolbar { + ToolbarItem(placement: .primaryAction) { + if batchInsertProgress != nil { + // Show nothing while batch inserting + EmptyView() + } else if (newTodo == nil) { + Menu { + Button { + withAnimation { + newTodo = .init( + listId: listId, + isComplete: false, + description: "" + ) + } + } label: { + Label("Add Single Todo", systemImage: "plus") + } + + Button { + Task { + withAnimation { + batchInsertProgress = 0 + } + + do { + try await system.insertManyTodos(listId: listId) { progress in + withAnimation { + batchInsertProgress = progress + if progress >= 1.0 { + // Small delay to show 100% before hiding + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { + withAnimation { + batchInsertProgress = nil + } + } + } + } + } + } catch { + self.error = error + withAnimation { + batchInsertProgress = nil + } + } + } + } label: { + Label("Add Many Todos", systemImage: "plus.square.on.square") + } + } label: { + Label("Add", systemImage: "plus") } - } label: { - Label("Add", systemImage: "plus") - } - } else { - Button("Cancel", role: .cancel) { - withAnimation { - newTodo = nil + } else { + Button("Cancel", role: .cancel) { + withAnimation { + newTodo = nil + } } } } } + + if isLoadingTodos && todos.isEmpty { + ProgressView() + .progressViewStyle(CircularProgressViewStyle()) + .scaleEffect(1.5) + .frame(maxWidth: .infinity, maxHeight: .infinity) + .background(Color.black.opacity(0.05)) + } } .task { + isLoadingTodos = true await system.watchTodos(listId) { tds in withAnimation { self.todos = IdentifiedArrayOf(uniqueElements: tds) + self.isLoadingTodos = false } } } } - func toggleCompletion(of todo: Todo) async { + func toggleCompletion(of todo: Todo) async throws { var updatedTodo = todo updatedTodo.isComplete.toggle() do { @@ -89,7 +160,6 @@ struct TodoListView: View { let todosToDelete = offset.map { todos[$0] } try await system.deleteTodo(id: todosToDelete[0].id) - } catch { self.error = error } diff --git a/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift b/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift index fe5b184..14e4ae9 100644 --- a/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift +++ b/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift @@ -81,36 +81,37 @@ class SupabaseConnector: PowerSyncBackendConnector { } override func uploadData(database: PowerSyncDatabaseProtocol) async throws { + guard let transaction = try await database.getCrudBatch() else { return } + + var lastEntry: CrudEntry? + do { + try await Task.detached { + for entry in transaction.crud { + lastEntry = entry + let tableName = entry.table + + let table = self.client.from(tableName) + + switch entry.op { + case .put: + var data: [String: AnyCodable] = entry.opData?.mapValues { AnyCodable($0) } ?? [:] + data["id"] = AnyCodable(entry.id) + try await table.upsert(data).execute(); + case .patch: + guard let opData = entry.opData else { continue } + let encodableData = opData.mapValues { AnyCodable($0) } + try await table.update(encodableData).eq("id", value: entry.id).execute() + case .delete: + try await table.delete().eq( "id", value: entry.id).execute() + } + } - guard let transaction = try await database.getNextCrudTransaction() else { return } - - var lastEntry: CrudEntry? - do { - for entry in transaction.crud { - lastEntry = entry - let tableName = entry.table - - let table = client.from(tableName) - - switch entry.op { - case .put: - var data: [String: AnyCodable] = entry.opData?.mapValues { AnyCodable($0) } ?? [:] - data["id"] = AnyCodable(entry.id) - try await table.upsert(data).execute(); - case .patch: - guard let opData = entry.opData else { continue } - let encodableData = opData.mapValues { AnyCodable($0) } - try await table.update(encodableData).eq("id", value: entry.id).execute() - case .delete: - try await table.delete().eq( "id", value: entry.id).execute() - } - } - - _ = try await transaction.complete.invoke(p1: nil) + _ = try await transaction.complete.invoke(p1: nil) + }.value - } catch { - if let errorCode = PostgresFatalCodes.extractErrorCode(from: error), - PostgresFatalCodes.isFatalError(errorCode) { + } catch { + if let errorCode = PostgresFatalCodes.extractErrorCode(from: error), + PostgresFatalCodes.isFatalError(errorCode) { /// Instead of blocking the queue with these errors, /// discard the (rest of the) transaction. /// @@ -123,9 +124,9 @@ class SupabaseConnector: PowerSyncBackendConnector { return } - print("Data upload error - retrying last entry: \(lastEntry!), \(error)") - throw error - } + print("Data upload error - retrying last entry: \(lastEntry!), \(error)") + throw error + } } deinit { diff --git a/Demo/PowerSyncExample/PowerSync/SystemManager.swift b/Demo/PowerSyncExample/PowerSync/SystemManager.swift index 8f88dec..6ce9792 100644 --- a/Demo/PowerSyncExample/PowerSync/SystemManager.swift +++ b/Demo/PowerSyncExample/PowerSync/SystemManager.swift @@ -29,106 +29,136 @@ class SystemManager { } func signOut() async throws -> Void { - try await db.disconnectAndClear() - try await connector.client.auth.signOut() + try await Task.detached(priority: .userInitiated) { + try await self.db.disconnectAndClear() + try await self.connector.client.auth.signOut() + }.value } func watchLists(_ callback: @escaping (_ lists: [ListContent]) -> Void ) async { - do { - for try await lists in try self.db.watch( - sql: "SELECT * FROM \(LISTS_TABLE)", - parameters: [], - mapper: { cursor in - try ListContent( - id: cursor.getString(name: "id"), - name: cursor.getString(name: "name"), - createdAt: cursor.getString(name: "created_at"), - ownerId: cursor.getString(name: "owner_id") - ) + Task.detached(priority: .high) { + do { + for try await lists in try self.db.watch( + sql: "SELECT * FROM \(LISTS_TABLE)", + parameters: [], + mapper: { cursor in + try ListContent( + id: cursor.getString(name: "id"), + name: cursor.getString(name: "name"), + createdAt: cursor.getString(name: "created_at"), + ownerId: cursor.getString(name: "owner_id") + ) + } + ) { + callback(lists) } - ) { - callback(lists) + } catch { + print("Error in watch: \(error)") } - } catch { - print("Error in watch: \(error)") } } func insertList(_ list: NewListContent) async throws { - _ = try await self.db.execute( - sql: "INSERT INTO \(LISTS_TABLE) (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)", - parameters: [list.name, connector.currentUserID] - ) + try await Task.detached(priority: .userInitiated) { + _ = try await self.db.execute( + sql: "INSERT INTO \(LISTS_TABLE) (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)", + parameters: [list.name, self.connector.currentUserID] + ) + }.value } func deleteList(id: String) async throws { - _ = try await db.writeTransaction(callback: { transaction in - _ = try transaction.execute( - sql: "DELETE FROM \(LISTS_TABLE) WHERE id = ?", - parameters: [id] - ) - _ = try transaction.execute( - sql: "DELETE FROM \(TODOS_TABLE) WHERE list_id = ?", - parameters: [id] - ) - return - }) - } + try await Task.detached(priority: .userInitiated) { + _ = try await self.db.writeTransaction(callback: { transaction in + _ = try transaction.execute( + sql: "DELETE FROM \(LISTS_TABLE) WHERE id = ?", + parameters: [id] + ) + _ = try transaction.execute( + sql: "DELETE FROM \(TODOS_TABLE) WHERE list_id = ?", + parameters: [id] + ) + return + }) + }.value + } func watchTodos(_ listId: String, _ callback: @escaping (_ todos: [Todo]) -> Void ) async { - do { - for try await todos in try self.db.watch( - sql: "SELECT * FROM \(TODOS_TABLE) WHERE list_id = ?", - parameters: [listId], - mapper: { cursor in - try Todo( - id: cursor.getString(name: "id"), - listId: cursor.getString(name: "list_id"), - photoId: cursor.getStringOptional(name: "photo_id"), - description: cursor.getString(name: "description"), - isComplete: cursor.getBoolean(name: "completed"), - createdAt: cursor.getString(name: "created_at"), - completedAt: cursor.getStringOptional(name: "completed_at"), - createdBy: cursor.getStringOptional(name: "created_by"), - completedBy: cursor.getStringOptional(name: "completed_by") - ) + Task.detached(priority: .high) { + do { + for try await todos in try self.db.watch( + sql: "SELECT * FROM \(TODOS_TABLE) WHERE list_id = ?", + parameters: [listId], + mapper: { cursor in + try Todo( + id: cursor.getString(name: "id"), + listId: cursor.getString(name: "list_id"), + photoId: cursor.getStringOptional(name: "photo_id"), + description: cursor.getString(name: "description"), + isComplete: cursor.getBoolean(name: "completed"), + createdAt: cursor.getString(name: "created_at"), + completedAt: cursor.getStringOptional(name: "completed_at"), + createdBy: cursor.getStringOptional(name: "created_by"), + completedBy: cursor.getStringOptional(name: "completed_by") + ) + } + ) { + callback(todos) } - ) { - callback(todos) + } catch { + print("Error in watch: \(error)") } - } catch { - print("Error in watch: \(error)") } } func insertTodo(_ todo: NewTodo, _ listId: String) async throws { - _ = try await self.db.execute( - sql: "INSERT INTO \(TODOS_TABLE) (id, created_at, created_by, description, list_id, completed) VALUES (uuid(), datetime(), ?, ?, ?, ?)", - parameters: [connector.currentUserID, todo.description, listId, todo.isComplete] - ) + try await Task.detached(priority: .userInitiated) { + _ = try await self.db.execute( + sql: "INSERT INTO \(TODOS_TABLE) (id, created_at, created_by, description, list_id, completed) VALUES (uuid(), datetime(), ?, ?, ?, ?)", + parameters: [self.connector.currentUserID, todo.description, listId, todo.isComplete] + ) + }.value + } + + + func insertManyTodos(listId: String, onProgress: @escaping (Double) -> Void = { _ in }, totalTodos: Int = 2000) async throws { + try await Task.detached(priority: .userInitiated) { + for i in 1...totalTodos { + let todo = NewTodo(listId: listId, isComplete: false, description: "Todo #\(i)") + try await self.insertTodo(todo, listId) + + let progress = Double(i) / Double(totalTodos) + await MainActor.run { + onProgress(progress) + } + } + }.value } func updateTodo(_ todo: Todo) async throws { - // Do this to avoid needing to handle date time from Swift to Kotlin - if(todo.isComplete) { - _ = try await self.db.execute( - sql: "UPDATE \(TODOS_TABLE) SET description = ?, completed = ?, completed_at = datetime(), completed_by = ? WHERE id = ?", - parameters: [todo.description, todo.isComplete, connector.currentUserID, todo.id] - ) - } else { - _ = try await self.db.execute( - sql: "UPDATE \(TODOS_TABLE) SET description = ?, completed = ?, completed_at = NULL, completed_by = NULL WHERE id = ?", - parameters: [todo.description, todo.isComplete, todo.id] - ) - } + try await Task.detached(priority: .userInitiated) { + if(todo.isComplete) { + _ = try await self.db.execute( + sql: "UPDATE \(TODOS_TABLE) SET description = ?, completed = ?, completed_at = datetime(), completed_by = ? WHERE id = ?", + parameters: [todo.description, todo.isComplete, self.connector.currentUserID, todo.id] + ) + } else { + _ = try await self.db.execute( + sql: "UPDATE \(TODOS_TABLE) SET description = ?, completed = ?, completed_at = NULL, completed_by = NULL WHERE id = ?", + parameters: [todo.description, todo.isComplete, todo.id] + ) + } + }.value } func deleteTodo(id: String) async throws { - _ = try await db.writeTransaction(callback: { transaction in - try transaction.execute( - sql: "DELETE FROM \(TODOS_TABLE) WHERE id = ?", - parameters: [id] - ) - }) + try await Task.detached(priority: .userInitiated) { + _ = try await self.db.writeTransaction(callback: { transaction in + try transaction.execute( + sql: "DELETE FROM \(TODOS_TABLE) WHERE id = ?", + parameters: [id] + ) + }) + }.value } } diff --git a/Demo/PowerSyncExample/Screens/HomeScreen.swift b/Demo/PowerSyncExample/Screens/HomeScreen.swift index 6a4da2f..524c739 100644 --- a/Demo/PowerSyncExample/Screens/HomeScreen.swift +++ b/Demo/PowerSyncExample/Screens/HomeScreen.swift @@ -6,29 +6,50 @@ struct HomeScreen: View { @Environment(SystemManager.self) private var system @Environment(AuthModel.self) private var authModel @Environment(NavigationModel.self) private var navigationModel + @State private var isSigningOut = false - - var body: some View { - + var body: some View { ListView() - .toolbar { - ToolbarItem(placement: .cancellationAction) { - Button("Sign out") { - Task { - try await system.signOut() - authModel.isAuthenticated = false - navigationModel.path = NavigationPath() + .toolbar { + ToolbarItem(placement: .cancellationAction) { + Button { + signOut() + } label: { + if isSigningOut { + HStack { + Text("Signing out") + ProgressView() + .progressViewStyle(CircularProgressViewStyle()) + .scaleEffect(0.7) + } + } else { + Text("Sign out") + } + } + .disabled(isSigningOut) + } + } + .task { + if(system.db.currentStatus.connected == false) { + await system.connect() } - } } - } - .task { - if(system.db.currentStatus.connected == false) { - await system.connect() - } - } - .navigationBarBackButtonHidden(true) + .navigationBarBackButtonHidden(true) + } + + private func signOut() { + Task { + isSigningOut = true + do { + try await system.signOut() + authModel.isAuthenticated = false + navigationModel.path = NavigationPath() + } catch { + print("Sign out error: \(error)") + } + isSigningOut = false } + } } #Preview { diff --git a/Package.resolved b/Package.resolved index 1330fb9..15ab07b 100644 --- a/Package.resolved +++ b/Package.resolved @@ -5,8 +5,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/powersync-ja/powersync-kotlin.git", "state" : { - "revision" : "203db74889df8a20e3c6ac38aede6b0186d2e3b5", - "version" : "1.0.0-BETA23.0" + "revision" : "a683c43f9d432fdc9257626659c4da10f67ec8fb", + "version" : "1.0.0-BETA24.0" } }, { diff --git a/Package.swift b/Package.swift index 8ad0cd0..e770997 100644 --- a/Package.swift +++ b/Package.swift @@ -16,7 +16,7 @@ let package = Package( targets: ["PowerSync"]), ], dependencies: [ - .package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA23.0"), + .package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA24.0"), .package(url: "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "0.3.9"..<"0.4.0") ], targets: [ diff --git a/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift b/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift index ef2eaf1..a16baa4 100644 --- a/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift +++ b/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift @@ -244,11 +244,148 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase { await fulfillment(of: [expectation], timeout: 5) watchTask.cancel() + let finalResults = await resultsStore.getResults() + print(finalResults) XCTAssertEqual(finalResults.count, 2) XCTAssertEqual(finalResults[1], ["User 1", "User 2"]) } + func testWatchTableChangesUsingTransaction() async throws { + let expectation = XCTestExpectation(description: "Watch changes") + + // Create an actor to handle concurrent mutations + actor ResultsStore { + private var results: [[String]] = [] + + func append(_ names: [String]) { + results.append(names) + } + + func getResults() -> [[String]] { + results + } + + func count() -> Int { + results.count + } + } + + let resultsStore = ResultsStore() + + let stream = try database.watch( + sql: "SELECT name FROM users ORDER BY id", + parameters: nil + ) { cursor in + cursor.getString(index: 0)! + } + + let watchTask = Task { + for try await names in stream { + await resultsStore.append(names) + if await resultsStore.count() == 2 { + expectation.fulfill() + } + } + } + + try await database.writeTransaction { transaction in + _ = try transaction.execute( + sql: "INSERT INTO users (id, name, email) VALUES (?, ?, ?)", + parameters: ["1", "User 1", "user1@example.com"] + ) + + _ = try transaction.execute( + sql: "INSERT INTO users (id, name, email) VALUES (?, ?, ?)", + parameters: ["2", "User 2", "user2@example.com"] + ) + } + + await fulfillment(of: [expectation], timeout: 5) + watchTask.cancel() + + + let finalResults = await resultsStore.getResults() + XCTAssertEqual(finalResults.count, 2) + XCTAssertEqual(finalResults[1], ["User 1", "User 2"]) + } + + func testWatchTableMoreComplexChanges() async throws { + let expectation = XCTestExpectation(description: "Watch changes") + + // Create an actor to handle concurrent mutations + actor ResultsStore { + private var results: [[String]] = [] + + func append(_ names: [String]) { + results.append(names) + } + + func getResults() -> [[String]] { + results + } + + func count() -> Int { + results.count + } + } + + let resultsStore = ResultsStore() + + let stream = try database.watch( + sql: "SELECT name FROM users ORDER BY id", + parameters: nil + ) { cursor in + cursor.getString(index: 0)! + } + + let watchTask = Task { + for try await names in stream { + await resultsStore.append(names) + if await resultsStore.count() == 2 { + expectation.fulfill() + } + } + } + + _ = try await database.execute( + sql: "INSERT INTO users (id, name, email) VALUES (?, ?, ?)", + parameters: ["1", "User 1", "user1@example.com"] + ) + + + _ = try await database.execute( + sql: "INSERT INTO users (id, name, email) VALUES (?, ?, ?)", + parameters: ["2", "User 2", "user2@example.com"] + ) + + _ = try await database.writeTransaction { transaction in + _ = try transaction.execute( + sql: "DELETE FROM users WHERE id = ?", + parameters: ["1"] + ) + } + + _ = try await database.execute( + sql: "DELETE FROM users WHERE id = ?", + parameters: ["2"] + ) + + _ = try await database.execute( + sql: "INSERT INTO users (id, name, email) VALUES (?, ?, ?)", + parameters: ["3", "User 3", "user3@example.com"] + ) + + await fulfillment(of: [expectation], timeout: 5) + watchTask.cancel() + + + let finalResults = await resultsStore.getResults() + print(finalResults) + XCTAssertEqual(finalResults.count, 2) + XCTAssertEqual(finalResults[1], ["User 3"]) + } + func testWatchError() async throws { do { let stream = try database.watch(