Skip to content

Commit

Permalink
feat: show sync retry state for internal build #WPB-11198 (#3062)
Browse files Browse the repository at this point in the history
* feat: show sync retry state for internal build #WPB-11198

* fix tests - new param
  • Loading branch information
damian-kaczmarek authored Oct 22, 2024
1 parent a2f0ead commit 5e8bfaf
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.wire.kalium.logic.data.sync

import com.wire.kalium.logic.CoreFailure
import kotlin.time.Duration

sealed interface IncrementalSyncStatus {

Expand All @@ -34,8 +35,8 @@ sealed interface IncrementalSyncStatus {
override fun toString() = "LIVE"
}

data class Failed(val failure: CoreFailure) : IncrementalSyncStatus {
override fun toString() = "FAILED, cause: '$failure'"
data class Failed(val failure: CoreFailure, val retryDelay: Duration) : IncrementalSyncStatus {
override fun toString() = "FAILED, cause: '$failure' retry in: $retryDelay"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.wire.kalium.logic.data.sync

import com.wire.kalium.logic.CoreFailure
import kotlin.time.Duration

sealed interface SlowSyncStatus {

Expand All @@ -28,7 +29,7 @@ sealed interface SlowSyncStatus {

data class Ongoing(val currentStep: SlowSyncStep) : SlowSyncStatus

data class Failed(val failure: CoreFailure) : SlowSyncStatus
data class Failed(val failure: CoreFailure, val retryDelay: Duration) : SlowSyncStatus
}

enum class SlowSyncStep {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.wire.kalium.logic.data.sync

import com.wire.kalium.logic.CoreFailure
import kotlin.time.Duration

sealed class SyncState {

Expand Down Expand Up @@ -53,6 +54,7 @@ sealed class SyncState {

/**
* Sync was not completed due to a failure.
* [retryDelay] specifies the duration in which next try will happen
*/
data class Failed(val cause: CoreFailure) : SyncState()
data class Failed(val cause: CoreFailure, val retryDelay: Duration) : SyncState()
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ internal class ObserveSyncStateUseCaseImpl internal constructor(
override operator fun invoke(): Flow<SyncState> =
combine(slowSyncRepository.slowSyncStatus, incrementalSyncRepository.incrementalSyncState) { slowStatus, incrementalStatus ->
when (slowStatus) {
is SlowSyncStatus.Failed -> SyncState.Failed(slowStatus.failure)
is SlowSyncStatus.Failed -> SyncState.Failed(slowStatus.failure, slowStatus.retryDelay)
is SlowSyncStatus.Ongoing -> SyncState.SlowSync
SlowSyncStatus.Pending -> SyncState.Waiting
SlowSyncStatus.Complete -> {
when (incrementalStatus) {
IncrementalSyncStatus.Live -> SyncState.Live
is IncrementalSyncStatus.Failed -> SyncState.Failed(incrementalStatus.failure)
is IncrementalSyncStatus.Failed -> SyncState.Failed(incrementalStatus.failure, incrementalStatus.retryDelay)
IncrementalSyncStatus.FetchingPendingEvents -> SyncState.GatheringPendingEvents
IncrementalSyncStatus.Pending -> SyncState.GatheringPendingEvents
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ internal class IncrementalSyncManager(
onFailure = { failure ->
logger.i("$TAG ExceptionHandler error $failure")
syncScope.launch {
incrementalSyncRepository.updateIncrementalSyncState(IncrementalSyncStatus.Failed(failure))
val delay = exponentialDurationHelper.next()
incrementalSyncRepository.updateIncrementalSyncState(
IncrementalSyncStatus.Failed(failure, delay)
)

incrementalSyncRecoveryHandler.recover(failure = failure) {
val delay = exponentialDurationHelper.next()
logger.i("$TAG Triggering delay($delay) and waiting for reconnection")
delayUntilConnectedOrPolicyUpgrade(delay)
logger.i("$TAG Delay and waiting for connection finished - retrying")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,15 @@ internal class SlowSyncManager(
private val syncMigrationStepsProvider: () -> SyncMigrationStepsProvider,
logger: KaliumLogger = kaliumLogger,
kaliumDispatcher: KaliumDispatcher = KaliumDispatcherImpl,
private val exponentialDurationHelper: ExponentialDurationHelper = ExponentialDurationHelperImpl(MIN_RETRY_DELAY, MAX_RETRY_DELAY)
private val exponentialDurationHelper: ExponentialDurationHelper = ExponentialDurationHelperImpl(
MIN_RETRY_DELAY,
MAX_RETRY_DELAY
)
) {

@OptIn(ExperimentalCoroutinesApi::class)
private val scope = CoroutineScope(SupervisorJob() + kaliumDispatcher.default.limitedParallelism(1))
private val scope =
CoroutineScope(SupervisorJob() + kaliumDispatcher.default.limitedParallelism(1))
private val logger = logger.withFeatureId(SYNC)

private val coroutineExceptionHandler = SyncExceptionHandler(
Expand All @@ -84,9 +88,9 @@ internal class SlowSyncManager(
onFailure = { failure ->
logger.i("SlowSync ExceptionHandler error $failure")
scope.launch {
slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Failed(failure))
val delay = exponentialDurationHelper.next()
slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Failed(failure, delay))
slowSyncRecoveryHandler.recover(failure) {
val delay = exponentialDurationHelper.next()
logger.i("SlowSync Triggering delay($delay) and waiting for reconnection")
networkStateObserver.delayUntilConnectedWithInternetAgain(delay)
logger.i("SlowSync Delay and waiting for connection finished - retrying")
Expand All @@ -101,30 +105,34 @@ internal class SlowSyncManager(
startMonitoring()
}

private suspend fun isSlowSyncNeededFlow(): Flow<SlowSyncParam> = slowSyncRepository.observeLastSlowSyncCompletionInstant()
.map { latestSlowSync ->
logger.i("Last SlowSync was performed on '$latestSlowSync'")
val lastVersion = slowSyncRepository.getSlowSyncVersion()
when {
(lastVersion != null) && (CURRENT_VERSION > lastVersion) -> {
logger.i("Last saved SlowSync version is $lastVersion, current is $CURRENT_VERSION")
SlowSyncParam.MigrationNeeded(oldVersion = lastVersion, newVersion = CURRENT_VERSION)
}
private suspend fun isSlowSyncNeededFlow(): Flow<SlowSyncParam> =
slowSyncRepository.observeLastSlowSyncCompletionInstant()
.map { latestSlowSync ->
logger.i("Last SlowSync was performed on '$latestSlowSync'")
val lastVersion = slowSyncRepository.getSlowSyncVersion()
when {
(lastVersion != null) && (CURRENT_VERSION > lastVersion) -> {
logger.i("Last saved SlowSync version is $lastVersion, current is $CURRENT_VERSION")
SlowSyncParam.MigrationNeeded(
oldVersion = lastVersion,
newVersion = CURRENT_VERSION
)
}

latestSlowSync == null -> {
SlowSyncParam.NotPerformedBefore
}
latestSlowSync == null -> {
SlowSyncParam.NotPerformedBefore
}

DateTimeUtil.currentInstant() > (latestSlowSync + MIN_TIME_BETWEEN_SLOW_SYNCS) -> {
logger.i("Slow sync too old - last slow sync was performed on '$latestSlowSync'")
SlowSyncParam.LastSlowSyncTooOld
}
DateTimeUtil.currentInstant() > (latestSlowSync + MIN_TIME_BETWEEN_SLOW_SYNCS) -> {
logger.i("Slow sync too old - last slow sync was performed on '$latestSlowSync'")
SlowSyncParam.LastSlowSyncTooOld
}

else -> {
SlowSyncParam.Success
else -> {
SlowSyncParam.Success
}
}
}
}

private fun startMonitoring() {
scope.launch(coroutineExceptionHandler) {
Expand All @@ -139,7 +147,10 @@ internal class SlowSyncManager(
}
}

private suspend fun handleCriteriaResolution(syncCriteriaResolution: SyncCriteriaResolution, isSlowSyncNeeded: SlowSyncParam) {
private suspend fun handleCriteriaResolution(
syncCriteriaResolution: SyncCriteriaResolution,
isSlowSyncNeeded: SlowSyncParam
) {
if (syncCriteriaResolution is SyncCriteriaResolution.Ready) {
// START SYNC IF NEEDED
logger.i("SlowSync criteria ready, checking if SlowSync is needed or already performed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.time.Duration
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -87,7 +88,7 @@ class IncrementalSyncRepositoryTest {
IncrementalSyncStatus.FetchingPendingEvents,
IncrementalSyncStatus.Live,
IncrementalSyncStatus.Pending,
IncrementalSyncStatus.Failed(NetworkFailure.NoNetworkConnection(null)),
IncrementalSyncStatus.Failed(NetworkFailure.NoNetworkConnection(null), Duration.ZERO),
IncrementalSyncStatus.FetchingPendingEvents
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import kotlinx.datetime.Instant
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.time.Duration
import kotlin.time.Duration.Companion.days

class GetNotificationsUseCaseTest {
Expand Down Expand Up @@ -192,7 +193,7 @@ class GetNotificationsUseCaseTest {
arrange.notificationEventsManager.observeEphemeralNotifications()
}.wasInvoked(exactly = once)

syncStatusFlow.emit(IncrementalSyncStatus.Failed(CoreFailure.Unknown(null)))
syncStatusFlow.emit(IncrementalSyncStatus.Failed(CoreFailure.Unknown(null), Duration.ZERO))

coVerify {
arrange.messageRepository.getNotificationMessage(any())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.mockative.once
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.time.Duration

class AvsSyncStateReporterTest {

Expand Down Expand Up @@ -124,7 +125,7 @@ class AvsSyncStateReporterTest {
fun withFailedIncrementalSyncState() = apply {
every {
incrementalSyncRepository.incrementalSyncState
}.returns(flowOf(IncrementalSyncStatus.Failed(CoreFailure.SyncEventOrClientNotFound)))
}.returns(flowOf(IncrementalSyncStatus.Failed(CoreFailure.SyncEventOrClientNotFound, Duration.ZERO)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import com.wire.kalium.logic.data.sync.SlowSyncStep
import com.wire.kalium.logic.data.sync.SyncState
import com.wire.kalium.logic.test_util.TestKaliumDispatcher
import io.mockative.Mock
import io.mockative.coEvery
import io.mockative.every
import io.mockative.mock
import kotlinx.coroutines.flow.MutableStateFlow
Expand All @@ -37,6 +36,7 @@ import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration

class ObserveSyncStateUseCaseTest {

Expand All @@ -49,7 +49,7 @@ class ObserveSyncStateUseCaseTest {

useCase().test {
val item = awaitItem()
assertEquals(SyncState.Failed(coreFailure), item)
assertEquals(SyncState.Failed(coreFailure, Duration.ZERO), item)
}
}

Expand Down Expand Up @@ -102,7 +102,7 @@ class ObserveSyncStateUseCaseTest {

useCase().test {
val item = awaitItem()
assertEquals(SyncState.Failed(coreFailure), item)
assertEquals(SyncState.Failed(coreFailure, Duration.ZERO), item)
}
}

Expand Down Expand Up @@ -198,7 +198,7 @@ class ObserveSyncStateUseCaseTest {

companion object {
val coreFailure = CoreFailure.Unknown(null)
val slowSyncFailureFlow = MutableStateFlow(SlowSyncStatus.Failed(coreFailure)).asStateFlow()
val incrementalSyncFailureFlow = flowOf(IncrementalSyncStatus.Failed(coreFailure))
val slowSyncFailureFlow = MutableStateFlow(SlowSyncStatus.Failed(coreFailure, Duration.ZERO)).asStateFlow()
val incrementalSyncFailureFlow = flowOf(IncrementalSyncStatus.Failed(coreFailure, Duration.ZERO))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ package com.wire.kalium.logic.sync
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.session.SessionRepository
import com.wire.kalium.logic.data.sync.InMemoryIncrementalSyncRepository
import com.wire.kalium.logic.data.sync.SlowSyncRepositoryImpl
import com.wire.kalium.logic.data.sync.IncrementalSyncRepository
import com.wire.kalium.logic.data.sync.IncrementalSyncStatus
import com.wire.kalium.logic.data.sync.SlowSyncRepository
import com.wire.kalium.logic.data.sync.SlowSyncRepositoryImpl
import com.wire.kalium.logic.data.sync.SlowSyncStatus
import com.wire.kalium.logic.data.sync.SlowSyncStep
import com.wire.kalium.logic.util.shouldFail
Expand All @@ -40,14 +40,15 @@ import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.time.Duration

@OptIn(ExperimentalCoroutinesApi::class)
class SyncManagerTest {

@Test
fun givenSlowSyncFailed_whenWaitingUntilLiveOrFailure_thenShouldReturnFailure() = runTest {
val (arrangement, syncManager) = Arrangement().arrange()
arrangement.slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Failed(CoreFailure.MissingClientRegistration))
arrangement.slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Failed(CoreFailure.MissingClientRegistration, Duration.ZERO))
arrangement.incrementalSyncRepository.updateIncrementalSyncState(IncrementalSyncStatus.Pending)

val result = syncManager.waitUntilLiveOrFailure()
Expand All @@ -59,7 +60,7 @@ class SyncManagerTest {
fun givenIncrementalSyncFailedAndSlowSyncIsComplete_whenWaitingUntilLiveOrFailure_thenShouldReturnFailure() = runTest {
val (arrangement, syncManager) = Arrangement().arrange()
arrangement.slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Complete)
val failedState = IncrementalSyncStatus.Failed(CoreFailure.MissingClientRegistration)
val failedState = IncrementalSyncStatus.Failed(CoreFailure.MissingClientRegistration, Duration.ZERO)
arrangement.incrementalSyncRepository.updateIncrementalSyncState(failedState)

val result = syncManager.waitUntilLiveOrFailure()
Expand All @@ -79,7 +80,7 @@ class SyncManagerTest {
advanceUntilIdle()
assertTrue { result.isActive }

arrangement.slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Failed(CoreFailure.MissingClientRegistration))
arrangement.slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Failed(CoreFailure.MissingClientRegistration, Duration.ZERO))
advanceUntilIdle()
result.await().shouldFail()
}
Expand All @@ -97,7 +98,7 @@ class SyncManagerTest {
assertTrue { result.isActive }

arrangement.slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Complete)
val failure = IncrementalSyncStatus.Failed(CoreFailure.MissingClientRegistration)
val failure = IncrementalSyncStatus.Failed(CoreFailure.MissingClientRegistration, Duration.ZERO)
arrangement.incrementalSyncRepository.updateIncrementalSyncState(failure)
advanceUntilIdle()
result.await().shouldFail()
Expand Down Expand Up @@ -144,7 +145,7 @@ class SyncManagerTest {
advanceUntilIdle()
assertTrue { result.isActive }

val failure = IncrementalSyncStatus.Failed(CoreFailure.MissingClientRegistration)
val failure = IncrementalSyncStatus.Failed(CoreFailure.MissingClientRegistration, Duration.ZERO)
arrangement.incrementalSyncRepository.updateIncrementalSyncState(failure)
advanceUntilIdle()
assertTrue { result.isCompleted }
Expand All @@ -155,7 +156,7 @@ class SyncManagerTest {
@Test
fun givenSlowSyncFailed_whenWaitingUntilStartedOrFailure_thenShouldReturnFailure() = runTest {
val (arrangement, syncManager) = Arrangement().arrange()
arrangement.slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Failed(CoreFailure.MissingClientRegistration))
arrangement.slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Failed(CoreFailure.MissingClientRegistration, Duration.ZERO))

val result = syncManager.waitUntilStartedOrFailure()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ interface NetworkStateObserver {
// Delay which will be completed earlier if there is a reconnection in the meantime.
suspend fun delayUntilConnectedWithInternetAgain(delay: Duration) {
// Delay for given amount but break it if reconnected again.
kaliumUtilLogger.i("$TAG delayUntilConnectedWithInternetAgain")
kaliumUtilLogger.i("$TAG delayUntilConnectedWithInternetAgain $delay")
withTimeoutOrNull(delay) {
// Drop the current value, so it will complete only if the connection changed again to connected during that time.
observeNetworkState()
Expand Down

0 comments on commit 5e8bfaf

Please sign in to comment.