Skip to content

Commit

Permalink
fix: Ignore ConnectionRequest on backend disabled RC [WPB-7087] (#2687)
Browse files Browse the repository at this point in the history
* fix: Ignore ConnectionRequest on backend disabled RC

* Fix code-style
  • Loading branch information
borichellow authored Apr 2, 2024
1 parent 65dd247 commit f2fe483
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.failure.InvalidMappingFailure
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.flatMapLeft
import com.wire.kalium.logic.functional.isRight
import com.wire.kalium.logic.functional.left
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
Expand Down Expand Up @@ -75,6 +77,7 @@ interface ConnectionRepository {
suspend fun setConnectionAsNotified(userId: UserId)
suspend fun setAllConnectionsAsNotified()
suspend fun deleteConnection(connection: Connection): Either<StorageFailure, Unit>
suspend fun ignoreConnectionRequest(userId: UserId): Either<CoreFailure, Unit>
}

@Suppress("LongParameterList", "TooManyFunctions")
Expand Down Expand Up @@ -125,22 +128,36 @@ internal class ConnectionDataSource(
}.map { }
}

override suspend fun updateConnectionStatus(userId: UserId, connectionState: ConnectionState): Either<CoreFailure, Connection> {
private suspend fun updateRemoteConnectionStatus(userId: UserId, connectionState: ConnectionState): Either<CoreFailure, ConnectionDTO> {
val isValidConnectionState = isValidConnectionState(connectionState)
val newConnectionStatus = connectionStatusMapper.toApiModel(connectionState)
if (!isValidConnectionState || newConnectionStatus == null) {
return Either.Left(InvalidMappingFailure)
}

return wrapApiRequest {
connectionApi.updateConnection(userId.toApi(), newConnectionStatus)
}.map { connectionDTO ->
val connectionStatus = connectionDTO.copy(status = newConnectionStatus)
return wrapApiRequest { connectionApi.updateConnection(userId.toApi(), newConnectionStatus) }
}

override suspend fun updateConnectionStatus(userId: UserId, connectionState: ConnectionState): Either<CoreFailure, Connection> =
updateRemoteConnectionStatus(userId, connectionState).map { connectionDTO ->
val connectionStatus = connectionDTO.copy(status = connectionStatusMapper.toApiModel(connectionState)!!)
val connectionModel = connectionMapper.fromApiToModel(connectionDTO)
handleUserConnectionStatusPersistence(connectionMapper.fromApiToModel(connectionStatus))
connectionModel
}
}

override suspend fun ignoreConnectionRequest(userId: UserId): Either<CoreFailure, Unit> =
updateRemoteConnectionStatus(userId, IGNORED)
.flatMapLeft {
if (it is NetworkFailure.FederatedBackendFailure.FailedDomains)
wrapStorageRequest { connectionDAO.getConnectionByUser(userId.toDao()) }
.map { connectionEntity ->
val updatedConnection = connectionMapper.fromDaoToModel(connectionEntity).copy(status = IGNORED)
handleUserConnectionStatusPersistence(updatedConnection)
}
else it.left()
}
.map { Unit }

/**
* Check if we can transition to the correct connection status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package com.wire.kalium.logic.feature.connection

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.connection.ConnectionRepository
import com.wire.kalium.logic.data.user.ConnectionState
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.kaliumLogger
Expand All @@ -43,7 +42,7 @@ internal class IgnoreConnectionRequestUseCaseImpl(
) : IgnoreConnectionRequestUseCase {

override suspend fun invoke(userId: UserId): IgnoreConnectionRequestUseCaseResult {
return connectionRepository.updateConnectionStatus(userId, ConnectionState.IGNORED)
return connectionRepository.ignoreConnectionRequest(userId)
.fold({
kaliumLogger.e("An error occurred when ignoring the connection request to $userId")
IgnoreConnectionRequestUseCaseResult.Failure(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ import com.wire.kalium.network.api.base.authenticated.connection.ConnectionRespo
import com.wire.kalium.network.api.base.authenticated.connection.ConnectionStateDTO
import com.wire.kalium.network.api.base.authenticated.userDetails.UserDetailsApi
import com.wire.kalium.network.api.base.model.ConversationId
import com.wire.kalium.network.api.base.model.FederationUnreachableResponse
import com.wire.kalium.network.api.base.model.LegalHoldStatusDTO
import com.wire.kalium.network.api.base.model.QualifiedID
import com.wire.kalium.network.api.base.model.UserProfileDTO
import com.wire.kalium.network.exceptions.KaliumException
import com.wire.kalium.network.utils.NetworkResponse
import com.wire.kalium.persistence.dao.ConnectionDAO
import com.wire.kalium.persistence.dao.ConnectionEntity
import com.wire.kalium.persistence.dao.ConversationIDEntity
import com.wire.kalium.persistence.dao.QualifiedIDEntity
import com.wire.kalium.persistence.dao.UserDAO
import com.wire.kalium.persistence.dao.UserIDEntity
Expand All @@ -63,6 +65,7 @@ import io.mockative.twice
import io.mockative.verify
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Instant
import kotlin.test.Test
import com.wire.kalium.network.api.base.model.UserId as NetworkUserId

Expand Down Expand Up @@ -309,6 +312,70 @@ class ConnectionRepositoryTest {
.wasInvoked(once)
}

@Test
fun givenAConnectionRequestIgnore_WhenSendingAConnectionStatusValid_thenTheConnectionShouldBePersisted() = runTest {
// given
val userId = NetworkUserId("user_id", "domain_id")
val (arrangement, connectionRepository) = Arrangement().arrange()
arrangement
.withSuccessfulUpdateConnectionStatusResponse(userId)
.withSuccessfulFetchSelfUserConnectionsResponse(arrangement.stubUserProfileDTO)

// when
val result = connectionRepository.ignoreConnectionRequest(UserId(userId.value, userId.domain))
result.shouldSucceed { arrangement.stubConnectionOne }

// then
verify(arrangement.connectionApi)
.suspendFunction(arrangement.connectionApi::updateConnection)
.with(eq(userId), eq(ConnectionStateDTO.IGNORED))
.wasInvoked(once)
}

@Test
fun givenAConnectionRequestIgnore_WhenApiUpdateFailedWithFederatedFailedDomains_thenTheConnectionShouldBePersisted() = runTest {
// given
val userId = NetworkUserId("user_id", "domain_id")
val (arrangement, connectionRepository) = Arrangement().arrange()
arrangement
.withErrorUpdatingConnectionStatusResponse(
userId,
KaliumException.FederationUnreachableException(FederationUnreachableResponse())
)
.withConnectionEntityByUser()
.withSuccessfulFetchSelfUserConnectionsResponse(arrangement.stubUserProfileDTO)

// when
val result = connectionRepository.ignoreConnectionRequest(UserId(userId.value, userId.domain))
result.shouldSucceed { arrangement.stubConnectionOne }

// then
verify(arrangement.connectionApi)
.suspendFunction(arrangement.connectionApi::updateConnection)
.with(eq(userId), eq(ConnectionStateDTO.IGNORED))
.wasInvoked(once)
}

@Test
fun givenAConnectionRequestIgnore_WhenApiUpdateFailedWithNonFederatedFailedDomains_thenTheConnectionNotShouldBePersisted() = runTest {
// given
val userId = NetworkUserId("user_id", "domain_id")
val (arrangement, connectionRepository) = Arrangement().arrange()
arrangement
.withErrorUpdatingConnectionStatusResponse(userId)
.withSuccessfulFetchSelfUserConnectionsResponse(arrangement.stubUserProfileDTO)

// when
val result = connectionRepository.ignoreConnectionRequest(UserId(userId.value, userId.domain))
result.shouldFail { arrangement.stubConnectionOne }

// then
verify(arrangement.connectionApi)
.suspendFunction(arrangement.connectionApi::updateConnection)
.with(eq(userId), eq(ConnectionStateDTO.IGNORED))
.wasInvoked(once)
}

private class Arrangement :
MemberDAOArrangement by MemberDAOArrangementImpl() {
@Mock
Expand Down Expand Up @@ -384,6 +451,16 @@ class ConnectionRepositoryTest {
val stubConversationID1 = QualifiedIDEntity("conversationId1", "domain")
val stubConversationID2 = QualifiedIDEntity("conversationId2", "domain")

val connectionEntity = ConnectionEntity(
conversationId = "conversationId1",
from = "fromId",
lastUpdateDate = Instant.DISTANT_PAST,
qualifiedConversationId = ConversationIDEntity("conversationId1", "domain"),
qualifiedToId = QualifiedIDEntity("connectionId1", "domain"),
status = ConnectionEntity.State.ACCEPTED,
toId = "connectionId1"
)

fun withSelfUserTeamId(either: Either<CoreFailure, TeamId?>): Arrangement {
given(selfTeamIdProvider)
.suspendFunction(selfTeamIdProvider::invoke)
Expand Down Expand Up @@ -448,7 +525,7 @@ class ConnectionRepositoryTest {
fun withSuccessfulUpdateConnectionStatusResponse(userId: NetworkUserId): Arrangement = apply {
given(connectionApi)
.suspendFunction(connectionApi::updateConnection)
.whenInvokedWith(eq(userId), eq(ConnectionStateDTO.ACCEPTED))
.whenInvokedWith(eq(userId), any())
.then { _, _ -> NetworkResponse.Success(stubConnectionOne, mapOf(), 200) }

withUpdateOrInsertOneOnOneMemberSuccess(
Expand All @@ -457,11 +534,14 @@ class ConnectionRepositoryTest {
)
}

fun withErrorUpdatingConnectionStatusResponse(userId: NetworkUserId): Arrangement = apply {
fun withErrorUpdatingConnectionStatusResponse(
userId: NetworkUserId,
exception: KaliumException = KaliumException.GenericError(RuntimeException("An error the server threw!"))
): Arrangement = apply {
given(connectionApi)
.suspendFunction(connectionApi::updateConnection)
.whenInvokedWith(eq(userId), any())
.then { _, _ -> NetworkResponse.Error(KaliumException.GenericError(RuntimeException("An error the server threw!"))) }
.then { _, _ -> NetworkResponse.Error(exception) }
}

fun withDeleteConnectionDataAndConversation(conversationId: QualifiedIDEntity): Arrangement = apply {
Expand All @@ -471,6 +551,13 @@ class ConnectionRepositoryTest {
.thenReturn(Unit)
}

fun withConnectionEntityByUser(): Arrangement = apply {
given(connectionDAO)
.suspendFunction(connectionDAO::getConnectionByUser)
.whenInvokedWith(any())
.thenReturn(connectionEntity)
}

fun withSuccessfulFetchSelfUserConnectionsResponse(stubUserProfileDTO: UserProfileDTO): Arrangement {
given(connectionApi)
.suspendFunction(connectionApi::fetchSelfUserConnections)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,27 @@ class IgnoreConnectionRequestUseCaseTest {
fun givenAConnectionRequest_whenInvokingIgnoreConnectionRequestAndOk_thenShouldReturnsASuccessResult() = runTest {
// given
given(connectionRepository)
.suspendFunction(connectionRepository::updateConnectionStatus)
.whenInvokedWith(eq(userId), eq(ConnectionState.IGNORED))
.thenReturn(Either.Right(connection))
.suspendFunction(connectionRepository::ignoreConnectionRequest)
.whenInvokedWith(eq(userId))
.thenReturn(Either.Right(Unit))

// when
val resultOk = ignoreConnectionRequestUseCase(userId)

// then
assertEquals(IgnoreConnectionRequestUseCaseResult.Success, resultOk)
verify(connectionRepository)
.suspendFunction(connectionRepository::updateConnectionStatus)
.with(eq(userId), eq(ConnectionState.IGNORED))
.suspendFunction(connectionRepository::ignoreConnectionRequest)
.with(eq(userId))
.wasInvoked(once)
}

@Test
fun givenAConnectionRequest_whenInvokingIgnoreConnectionRequestAndFails_thenShouldReturnsAFailureResult() = runTest {
// given
given(connectionRepository)
.suspendFunction(connectionRepository::updateConnectionStatus)
.whenInvokedWith(eq(userId), eq(ConnectionState.IGNORED))
.suspendFunction(connectionRepository::ignoreConnectionRequest)
.whenInvokedWith(eq(userId))
.thenReturn(Either.Left(CoreFailure.Unknown(RuntimeException("Some error"))))

// when
Expand All @@ -81,8 +81,8 @@ class IgnoreConnectionRequestUseCaseTest {
// then
assertEquals(IgnoreConnectionRequestUseCaseResult.Failure::class, resultFailure::class)
verify(connectionRepository)
.suspendFunction(connectionRepository::updateConnectionStatus)
.with(eq(userId), eq(ConnectionState.IGNORED))
.suspendFunction(connectionRepository::ignoreConnectionRequest)
.with(eq(userId))
.wasInvoked(once)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ internal interface ConnectionRepositoryArrangement {
fun withDeleteConnection(result: Either<StorageFailure, Unit>, connection: Matcher<Connection> = any())
fun withConnectionList(connectionsFlow: Flow<List<ConversationDetails>>)
fun withUpdateConnectionStatus(result: Either<CoreFailure, Connection>)
fun withIgnoreConnectionRequest(result: Either<CoreFailure, Unit>)
}

internal open class ConnectionRepositoryArrangementImpl : ConnectionRepositoryArrangement {
Expand Down Expand Up @@ -75,4 +76,11 @@ internal open class ConnectionRepositoryArrangementImpl : ConnectionRepositoryAr
.whenInvokedWith(any(), any())
.thenReturn(result)
}

override fun withIgnoreConnectionRequest(result: Either<CoreFailure, Unit>) {
given(connectionRepository)
.suspendFunction(connectionRepository::ignoreConnectionRequest)
.whenInvokedWith(any())
.thenReturn(result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ package com.wire.kalium.network.api.v4.authenticated

import com.wire.kalium.network.AuthenticatedNetworkClient
import com.wire.kalium.network.api.base.authenticated.connection.ConnectionDTO
import com.wire.kalium.network.api.base.authenticated.connection.ConnectionStateDTO
import com.wire.kalium.network.api.base.authenticated.connection.UpdateConnectionRequest
import com.wire.kalium.network.api.base.model.UserId
import com.wire.kalium.network.api.v3.authenticated.ConnectionApiV3
import com.wire.kalium.network.exceptions.KaliumException
import com.wire.kalium.network.utils.NetworkResponse
import com.wire.kalium.network.utils.wrapFederationResponse
import com.wire.kalium.network.utils.wrapKaliumResponse
import io.ktor.client.request.post
import io.ktor.client.request.put
import io.ktor.client.request.setBody
import io.ktor.utils.io.errors.IOException

internal open class ConnectionApiV4 internal constructor(
Expand All @@ -40,4 +44,15 @@ internal open class ConnectionApiV4 internal constructor(
} catch (e: IOException) {
NetworkResponse.Error(KaliumException.GenericError(e))
}

override suspend fun updateConnection(userId: UserId, connectionStatus: ConnectionStateDTO): NetworkResponse<ConnectionDTO> =
try {
httpClient.put("$PATH_CONNECTIONS_ENDPOINTS/${userId.domain}/${userId.value}") {
setBody(UpdateConnectionRequest(connectionStatus))
}.let { response ->
wrapFederationResponse(response) { wrapKaliumResponse { response } }
}
} catch (e: IOException) {
NetworkResponse.Error(KaliumException.GenericError(e))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@ SELECT * FROM Connection LEFT JOIN User ON Connection.qualified_to == User.quali

selectConnectionsForNotification:
SELECT * FROM Connection LEFT JOIN User ON Connection.qualified_to == User.qualified_id WHERE status = 'PENDING' AND should_notify = 1;

selectConnectionRequestByUser:
SELECT * FROM Connection LEFT JOIN User ON Connection.qualified_to == User.qualified_id WHERE Connection.qualified_to = :user_id;
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ interface ConnectionDAO {
suspend fun getConnectionRequestsForNotification(): Flow<List<ConnectionEntity>>
suspend fun updateNotificationFlag(flag: Boolean, userId: QualifiedIDEntity)
suspend fun setAllConnectionsAsNotified()
suspend fun getConnectionByUser(userId: QualifiedIDEntity): ConnectionEntity?
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,10 @@ class ConnectionDAOImpl(
override suspend fun setAllConnectionsAsNotified() = withContext(queriesContext) {
connectionsQueries.setAllConnectionsAsNotified()
}

override suspend fun getConnectionByUser(userId: QualifiedIDEntity): ConnectionEntity? {
return withContext(queriesContext) {
connectionsQueries.selectConnectionRequestByUser(userId, connectionMapper::toModel).executeAsOneOrNull()
}
}
}

0 comments on commit f2fe483

Please sign in to comment.