Skip to content

Commit 015988c

Browse files
authored
Rust client: Schedule crud uploads after connecting (#210)
Rust client: Schedule crud uploads after connecting
1 parent 9c664a6 commit 015988c

File tree

3 files changed

+90
-7
lines changed

3 files changed

+90
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
* [Supabase Connector] Fixed issue where only `400` HTTP status code errors where reported as connection errors. The connector now reports errors for codes `>=400`.
66
* Update PowerSync core extension to `0.4.1`, fixing an issue with the new Rust client.
7+
* Rust sync client: Fix writes made while offline not being uploaded reliably.
78

89
## 1.2.0
910

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,65 @@ abstract class BaseSyncIntegrationTest(
563563
database.expectUserCount(2)
564564
}
565565

566+
@Test
567+
fun `handles write made while offline`() =
568+
databaseTest {
569+
connector = TestConnector()
570+
val uploadCompleted = CompletableDeferred<Unit>()
571+
checkpointResponse = {
572+
uploadCompleted.complete(Unit)
573+
WriteCheckpointResponse(WriteCheckpointData("1"))
574+
}
575+
576+
database.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("local write"))
577+
database.connect(connector, options = options)
578+
579+
turbineScope(timeout = 10.0.seconds) {
580+
val turbine = database.currentStatus.asFlow().testIn(scope)
581+
turbine.waitFor { it.connected }
582+
583+
val query = database.watch("SELECT name FROM users") { it.getString(0)!! }.testIn(scope)
584+
query.awaitItem() shouldBe listOf("local write")
585+
586+
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 1234))
587+
syncLines.send(
588+
SyncLine.FullCheckpoint(
589+
Checkpoint(
590+
writeCheckpoint = "1",
591+
lastOpId = "1",
592+
checksums = listOf(BucketChecksum("a", checksum = 0)),
593+
),
594+
),
595+
)
596+
syncLines.send(
597+
SyncLine.SyncDataBucket(
598+
bucket = "a",
599+
data =
600+
listOf(
601+
OplogEntry(
602+
checksum = 0,
603+
opId = "1",
604+
op = OpType.PUT,
605+
rowId = "1",
606+
rowType = "users",
607+
data = """{"id": "test1", "name": "from server"}""",
608+
),
609+
),
610+
after = null,
611+
nextAfter = null,
612+
),
613+
)
614+
615+
uploadCompleted.await()
616+
syncLines.send(SyncLine.CheckpointComplete("1"))
617+
618+
query.awaitItem() shouldBe listOf("from server")
619+
620+
turbine.cancelAndIgnoreRemainingEvents()
621+
query.cancelAndIgnoreRemainingEvents()
622+
}
623+
}
624+
566625
@Test
567626
fun testTokenExpired() =
568627
databaseTest {

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ import kotlinx.coroutines.launch
4848
import kotlinx.coroutines.withContext
4949
import kotlinx.datetime.Clock
5050
import kotlinx.serialization.Serializable
51-
import kotlinx.serialization.encodeToString
5251
import kotlinx.serialization.json.JsonElement
5352
import kotlinx.serialization.json.JsonObject
5453
import kotlinx.serialization.json.encodeToJsonElement
@@ -133,6 +132,7 @@ internal class SyncStream(
133132
uploadAllCrud()
134133
} finally {
135134
if (holdingUploadLock) {
135+
logger.v { "crud upload: notify completion" }
136136
completedCrudUploads.send(Unit)
137137
isUploadingCrud.set(null)
138138
}
@@ -297,6 +297,7 @@ internal class SyncStream(
297297
*/
298298
private inner class ActiveIteration(
299299
val scope: CoroutineScope,
300+
var hadSyncLine: Boolean = false,
300301
var fetchLinesJob: Job? = null,
301302
var credentialsInvalidation: Job? = null,
302303
) {
@@ -342,6 +343,8 @@ internal class SyncStream(
342343
fetchLinesJob =
343344
scope.launch {
344345
launch {
346+
logger.v { "listening for completed uploads" }
347+
345348
for (completion in completedCrudUploads) {
346349
control("completed_upload")
347350
}
@@ -406,16 +409,36 @@ internal class SyncStream(
406409
}
407410
}
408411

412+
/**
413+
* Triggers a crud upload when called for the first time.
414+
*
415+
* We could have pending local writes made while disconnected, so in addition to listening
416+
* on updates to `ps_crud`, we also need to trigger a CRUD upload in some other cases. We
417+
* do this on the first sync line because the client is likely to be online in that case.
418+
*/
419+
private fun triggerCrudUploadIfFirstLine() {
420+
if (!hadSyncLine) {
421+
triggerCrudUploadAsync()
422+
hadSyncLine = true
423+
}
424+
}
425+
426+
private suspend fun line(text: String) {
427+
triggerCrudUploadIfFirstLine()
428+
control("line_text", text)
429+
}
430+
431+
private suspend fun line(blob: ByteArray) {
432+
triggerCrudUploadIfFirstLine()
433+
control("line_binary", blob)
434+
}
435+
409436
private suspend fun connect(start: Instruction.EstablishSyncStream) {
410437
when (val method = options.method) {
411438
ConnectionMethod.Http ->
412-
connectViaHttp(start.request).collect { rawLine ->
413-
control("line_text", rawLine)
414-
}
439+
connectViaHttp(start.request).collect(this::line)
415440
is ConnectionMethod.WebSocket ->
416-
connectViaWebSocket(start.request, method).collect { binaryLine ->
417-
control("line_binary", binaryLine)
418-
}
441+
connectViaWebSocket(start.request, method).collect(this::line)
419442
}
420443
}
421444
}

0 commit comments

Comments
 (0)