Skip to content

Rust client: Schedule crud uploads after connecting #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [Supabase Connector] Fixed issue where only `400` HTTP status code errors where reported as connection errors. The connector now reports errors for codes `>=400`.
* Update PowerSync core extension to `0.4.1`, fixing an issue with the new Rust client.
* Rust sync client: Fix writes made while offline not being uploaded reliably.

## 1.2.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,65 @@ abstract class BaseSyncIntegrationTest(
database.expectUserCount(2)
}

@Test
fun `handles write made while offline`() =
databaseTest {
connector = TestConnector()
val uploadCompleted = CompletableDeferred<Unit>()
checkpointResponse = {
uploadCompleted.complete(Unit)
WriteCheckpointResponse(WriteCheckpointData("1"))
}

database.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("local write"))
database.connect(connector, options = options)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(scope)
turbine.waitFor { it.connected }

val query = database.watch("SELECT name FROM users") { it.getString(0)!! }.testIn(scope)
query.awaitItem() shouldBe listOf("local write")

syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 1234))
syncLines.send(
SyncLine.FullCheckpoint(
Checkpoint(
writeCheckpoint = "1",
lastOpId = "1",
checksums = listOf(BucketChecksum("a", checksum = 0)),
),
),
)
syncLines.send(
SyncLine.SyncDataBucket(
bucket = "a",
data =
listOf(
OplogEntry(
checksum = 0,
opId = "1",
op = OpType.PUT,
rowId = "1",
rowType = "users",
data = """{"id": "test1", "name": "from server"}""",
),
),
after = null,
nextAfter = null,
),
)

uploadCompleted.await()
syncLines.send(SyncLine.CheckpointComplete("1"))

query.awaitItem() shouldBe listOf("from server")

turbine.cancelAndIgnoreRemainingEvents()
query.cancelAndIgnoreRemainingEvents()
}
}

@Test
fun testTokenExpired() =
databaseTest {
Expand Down
37 changes: 30 additions & 7 deletions core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.datetime.Clock
import kotlinx.serialization.Serializable
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.encodeToJsonElement
Expand Down Expand Up @@ -133,6 +132,7 @@ internal class SyncStream(
uploadAllCrud()
} finally {
if (holdingUploadLock) {
logger.v { "crud upload: notify completion" }
completedCrudUploads.send(Unit)
isUploadingCrud.set(null)
}
Expand Down Expand Up @@ -297,6 +297,7 @@ internal class SyncStream(
*/
private inner class ActiveIteration(
val scope: CoroutineScope,
var hadSyncLine: Boolean = false,
var fetchLinesJob: Job? = null,
var credentialsInvalidation: Job? = null,
) {
Expand Down Expand Up @@ -342,6 +343,8 @@ internal class SyncStream(
fetchLinesJob =
scope.launch {
launch {
logger.v { "listening for completed uploads" }

for (completion in completedCrudUploads) {
control("completed_upload")
}
Expand Down Expand Up @@ -406,16 +409,36 @@ internal class SyncStream(
}
}

/**
* Triggers a crud upload when called for the first time.
*
* We could have pending local writes made while disconnected, so in addition to listening
* on updates to `ps_crud`, we also need to trigger a CRUD upload in some other cases. We
* do this on the first sync line because the client is likely to be online in that case.
*/
private fun triggerCrudUploadIfFirstLine() {
if (!hadSyncLine) {
triggerCrudUploadAsync()
hadSyncLine = true
}
}

private suspend fun line(text: String) {
triggerCrudUploadIfFirstLine()
control("line_text", text)
}

private suspend fun line(blob: ByteArray) {
triggerCrudUploadIfFirstLine()
control("line_binary", blob)
}

private suspend fun connect(start: Instruction.EstablishSyncStream) {
when (val method = options.method) {
ConnectionMethod.Http ->
connectViaHttp(start.request).collect { rawLine ->
control("line_text", rawLine)
}
connectViaHttp(start.request).collect(this::line)
is ConnectionMethod.WebSocket ->
connectViaWebSocket(start.request, method).collect { binaryLine ->
control("line_binary", binaryLine)
}
connectViaWebSocket(start.request, method).collect(this::line)
}
}
}
Expand Down