Skip to content

Fix deadlock when connecting after opening database #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 1.0.0-BETA30

* Fix a deadlock when calling `connect()` immediately after opening a database.
The issue has been introduced in version `1.0.0-BETA29`.

## 1.0.0-BETA29

* Fix potential race condition between jobs in `connect()` and `disconnect()`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,26 @@ class SyncIntegrationTest {
users shouldHaveSize amount
}

@Test
@OptIn(DelicateCoroutinesApi::class)
fun connectImmediately() =
databaseTest(createInitialDatabase = false) {
// Regression test for https://github.com/powersync-ja/powersync-kotlin/issues/169
val database = openDatabase()
database.connect(connector)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
turbine.waitFor { it.connected }
turbine.cancel()
}
}

@Test
@OptIn(DelicateCoroutinesApi::class)
fun closesResponseStreamOnDatabaseClose() =
databaseTest {
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)
database.connect(connector)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand All @@ -58,8 +72,7 @@ class SyncIntegrationTest {
@OptIn(DelicateCoroutinesApi::class)
fun cleansResourcesOnDisconnect() =
databaseTest {
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)
database.connect(connector)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand All @@ -80,8 +93,7 @@ class SyncIntegrationTest {
@Test
fun cannotUpdateSchemaWhileConnected() =
databaseTest {
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)
database.connect(connector)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand All @@ -99,8 +111,7 @@ class SyncIntegrationTest {
@Test
fun testPartialSync() =
databaseTest {
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)
database.connect(connector)

val checksums =
buildList {
Expand Down Expand Up @@ -146,7 +157,7 @@ class SyncIntegrationTest {
}

turbineScope(timeout = 10.0.seconds) {
val turbine = syncStream.status.asFlow().testIn(this)
val turbine = database.currentStatus.asFlow().testIn(this)
turbine.waitFor { it.connected }
database.expectUserCount(0)

Expand Down Expand Up @@ -191,8 +202,7 @@ class SyncIntegrationTest {
@Test
fun testRemembersLastPartialSync() =
databaseTest {
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)
database.connect(connector)

syncLines.send(
SyncLine.FullCheckpoint(
Expand Down Expand Up @@ -228,8 +238,7 @@ class SyncIntegrationTest {
@Test
fun setsDownloadingState() =
databaseTest {
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)
database.connect(connector)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -261,10 +270,9 @@ class SyncIntegrationTest {
fun setsConnectingState() =
databaseTest {
turbineScope(timeout = 10.0.seconds) {
val syncStream = database.syncStream()
val turbine = database.currentStatus.asFlow().testIn(this)

database.connectInternal(syncStream, 1000L)
database.connect(connector)
turbine.waitFor { it.connecting }

database.disconnect()
Expand All @@ -277,8 +285,7 @@ class SyncIntegrationTest {
@Test
fun testMultipleSyncsDoNotCreateMultipleStatusEntries() =
databaseTest {
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)
database.connect(connector)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -324,7 +331,7 @@ class SyncIntegrationTest {

turbineScope(timeout = 10.0.seconds) {
// Connect the first database
database.connect(connector, 1000L)
database.connect(connector)
db2.connect(connector)

waitFor {
Expand All @@ -350,7 +357,7 @@ class SyncIntegrationTest {
val turbine2 = db2.currentStatus.asFlow().testIn(this)

// Connect the first database
database.connect(connector, 1000L)
database.connect(connector)

turbine1.waitFor { it.connecting }
db2.connect(connector)
Expand Down Expand Up @@ -414,7 +421,7 @@ class SyncIntegrationTest {
databaseTest {
val testConnector = TestConnector()
connector = testConnector
database.connectInternal(database.syncStream(), 1000L)
database.connect(testConnector)

suspend fun expectUserRows(amount: Int) {
val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ import co.touchlab.kermit.Severity
import co.touchlab.kermit.TestConfig
import co.touchlab.kermit.TestLogWriter
import com.powersync.DatabaseDriverFactory
import com.powersync.PowerSyncDatabase
import com.powersync.bucket.WriteCheckpointData
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.createPowerSyncDatabaseImpl
import com.powersync.db.PowerSyncDatabaseImpl
import com.powersync.db.schema.Schema
import com.powersync.sync.SyncLine
import com.powersync.sync.SyncStream
import dev.mokkery.answering.returns
import dev.mokkery.everySuspend
import dev.mokkery.mock
import io.ktor.client.HttpClient
import io.ktor.client.HttpClientConfig
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import kotlinx.io.files.Path
import kotlinx.serialization.json.JsonObject

expect val factory: DatabaseDriverFactory

Expand All @@ -43,18 +43,22 @@ fun generatePrintLogWriter() =
}
}

internal fun databaseTest(testBody: suspend ActiveDatabaseTest.() -> Unit) =
runTest {
val running = ActiveDatabaseTest(this)
internal fun databaseTest(
createInitialDatabase: Boolean = true,
testBody: suspend ActiveDatabaseTest.() -> Unit,
) = runTest {
val running = ActiveDatabaseTest(this)
if (createInitialDatabase) {
// Make sure the database is initialized, we're using internal APIs that expect initialization.
running.database = running.openDatabaseAndInitialize()
}

try {
running.testBody()
} finally {
running.cleanup()
}
try {
running.testBody()
} finally {
running.cleanup()
}
}

@OptIn(ExperimentalKermitApi::class)
internal class ActiveDatabaseTest(
Expand Down Expand Up @@ -104,32 +108,27 @@ internal class ActiveDatabaseTest(
fun openDatabase(): PowerSyncDatabaseImpl {
logger.d { "Opening database $databaseName in directory $testDirectory" }
val db =
PowerSyncDatabase(
createPowerSyncDatabaseImpl(
factory = factory,
schema = Schema(UserRow.table),
dbFilename = databaseName,
dbDirectory = testDirectory,
logger = logger,
scope = scope,
createClient = ::createClient,
)
doOnCleanup { db.close() }
return db as PowerSyncDatabaseImpl
return db
}

suspend fun openDatabaseAndInitialize(): PowerSyncDatabaseImpl = openDatabase().also { it.readLock { } }

fun PowerSyncDatabase.syncStream(): SyncStream {
val client = MockSyncService(syncLines) { checkpointResponse() }
return SyncStream(
bucketStorage = database.bucketStorage,
connector = connector,
httpEngine = client,
uploadCrud = { connector.uploadData(this) },
retryDelayMs = 10,
logger = logger,
params = JsonObject(emptyMap()),
scope = scope,
)
private fun createClient(config: HttpClientConfig<*>.() -> Unit): HttpClient {
val engine = MockSyncService(syncLines) { checkpointResponse() }

return HttpClient(engine) {
config()
}
}

fun doOnCleanup(action: suspend () -> Unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import co.touchlab.kermit.Logger
import co.touchlab.skie.configuration.annotations.DefaultArgumentInterop
import com.powersync.db.PowerSyncDatabaseImpl
import com.powersync.db.schema.Schema
import com.powersync.sync.SyncStream
import com.powersync.utils.generateLogger
import io.ktor.client.HttpClient
import io.ktor.client.HttpClientConfig
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
Expand All @@ -30,7 +33,7 @@ public fun PowerSyncDatabase(
): PowerSyncDatabase {
val generatedLogger: Logger = generateLogger(logger)

return PowerSyncDatabaseImpl(
return createPowerSyncDatabaseImpl(
schema = schema,
factory = factory,
dbFilename = dbFilename,
Expand All @@ -39,3 +42,22 @@ public fun PowerSyncDatabase(
dbDirectory = dbDirectory,
)
}

internal fun createPowerSyncDatabaseImpl(
factory: DatabaseDriverFactory,
schema: Schema,
dbFilename: String,
scope: CoroutineScope,
logger: Logger,
dbDirectory: String?,
createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient = SyncStream::defaultHttpClient,
): PowerSyncDatabaseImpl =
PowerSyncDatabaseImpl(
schema = schema,
factory = factory,
dbFilename = dbFilename,
scope = scope,
logger = logger,
dbDirectory = dbDirectory,
createClient = createClient,
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.powersync.utils.JsonParam
import com.powersync.utils.JsonUtil
import com.powersync.utils.throttle
import com.powersync.utils.toJsonObject
import io.ktor.client.HttpClient
import io.ktor.client.HttpClientConfig
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
Expand Down Expand Up @@ -61,6 +63,7 @@ internal class PowerSyncDatabaseImpl(
private val dbFilename: String,
private val dbDirectory: String? = null,
val logger: Logger = Logger,
private val createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient,
) : PowerSyncDatabase {
companion object {
internal val streamConflictMessage =
Expand Down Expand Up @@ -148,22 +151,25 @@ internal class PowerSyncDatabaseImpl(
crudThrottleMs: Long,
retryDelayMs: Long,
params: Map<String, JsonParam?>,
) = mutex.withLock {
) {
waitReady()
disconnectInternal()

connectInternal(
SyncStream(
bucketStorage = bucketStorage,
connector = connector,
uploadCrud = suspend { connector.uploadData(this) },
retryDelayMs = retryDelayMs,
logger = logger,
params = params.toJsonObject(),
scope = scope,
),
crudThrottleMs,
)
mutex.withLock {
disconnectInternal()

connectInternal(
SyncStream(
bucketStorage = bucketStorage,
connector = connector,
uploadCrud = suspend { connector.uploadData(this) },
retryDelayMs = retryDelayMs,
logger = logger,
params = params.toJsonObject(),
scope = scope,
createClient = createClient,
),
crudThrottleMs,
)
}
}

@OptIn(FlowPreview::class)
Expand Down
Loading