Skip to content

Commit

Permalink
fix: multiple defederation system message [WPB-3912] (#1988)
Browse files Browse the repository at this point in the history
* fix: multiple defederation system message

* review fixes

* detekt fix

* removed unused argument

* added ExperimentalCoroutinesApi

* added named argument

* updated ktx serialization

---------

Co-authored-by: Yamil Medina <yamilmedina@users.noreply.github.com>
  • Loading branch information
Garzas and yamilmedina authored Aug 18, 2023
1 parent 3f69dc2 commit 856da30
Show file tree
Hide file tree
Showing 16 changed files with 186 additions and 157 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ androidx-test-orchestrator = "1.4.2"
androidx-sqlite = "2.3.0"
benasher-uuid = "0.8.0"
ktx-datetime = { strictly = "0.4.0" }
ktx-serialization = "1.4.1"
ktx-serialization = "1.5.1"
ktx-atomicfu = "0.18.5"
kover = "0.7.1"
multiplatform-settings = "1.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,5 @@ typealias ClientId = PlainId
data class Recipient(val id: UserId, val clients: List<ClientId>)

typealias UnreadEventCount = Map<UnreadEventType, Int>
typealias OneOnOneMembers = Map<ConversationId, UserId>
typealias GroupConversationMembers = Map<ConversationId, List<UserId>>
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import com.wire.kalium.logic.data.id.toApi
import com.wire.kalium.logic.data.id.toCrypto
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.data.member.ConversationsWithMembers
import com.wire.kalium.logic.data.message.MessageMapper
import com.wire.kalium.logic.data.message.UnreadEventType
import com.wire.kalium.logic.data.user.UserId
Expand Down Expand Up @@ -208,10 +207,14 @@ interface ConversationRepository {
isInformed: Boolean
): Either<StorageFailure, Unit>

suspend fun getConversationsWithMembersWithBothDomains(
suspend fun getGroupConversationsWithMembersWithBothDomains(
firstDomain: String,
secondDomain: String
): Either<CoreFailure, ConversationsWithMembers>
): Either<CoreFailure, GroupConversationMembers>

suspend fun getOneOnOneConversationsWithFederatedMembers(
domain: String
): Either<CoreFailure, OneOnOneMembers>
}

@Suppress("LongParameterList", "TooManyFunctions")
Expand Down Expand Up @@ -330,7 +333,7 @@ internal class ConversationDataSource internal constructor(
conversations.forEach { conversationsResponse ->
// do the cleanup of members from conversation in case when self user rejoined conversation
// and may not received any member remove or leave events
if (invalidateMembers) {
if (invalidateMembers && conversationsResponse.type == ConversationResponse.Type.GROUP) {
memberDAO.updateFullMemberList(
memberMapper.fromApiModelToDaoModel(conversationsResponse.members),
idMapper.fromApiToDao(conversationsResponse.id)
Expand Down Expand Up @@ -783,15 +786,21 @@ internal class ConversationDataSource internal constructor(
conversationMetaDataDAO.setInformedAboutDegradedMLSVerificationFlag(conversationId.toDao(), isInformed)
}

override suspend fun getConversationsWithMembersWithBothDomains(
override suspend fun getGroupConversationsWithMembersWithBothDomains(
firstDomain: String,
secondDomain: String
): Either<CoreFailure, ConversationsWithMembers> = wrapStorageRequest {
val entity = memberDAO.getConversationWithUserIdsWithBothDomains(firstDomain, secondDomain)
ConversationsWithMembers(
oneOnOne = entity.oneOnOne.mapKeys { it.key.toModel() }.mapValues { it.value.map { userIdEntity -> userIdEntity.toModel() } },
group = entity.group.mapKeys { it.key.toModel() }.mapValues { it.value.map { userIdEntity -> userIdEntity.toModel() } }
)
): Either<CoreFailure, GroupConversationMembers> = wrapStorageRequest {
memberDAO.getGroupConversationWithUserIdsWithBothDomains(firstDomain, secondDomain)
.mapKeys { it.key.toModel() }
.mapValues { it.value.map { userId -> userId.toModel() } }
}

override suspend fun getOneOnOneConversationsWithFederatedMembers(
domain: String
): Either<CoreFailure, OneOnOneMembers> = wrapStorageRequest {
memberDAO.getOneOneConversationWithFederatedMembers(domain)
.mapKeys { it.key.toModel() }
.mapValues { it.value.toModel() }
}

private suspend fun persistIncompleteConversations(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package com.wire.kalium.logic.data.publicuser

import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.IdMapper
import com.wire.kalium.logic.data.id.QualifiedID
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.publicuser.model.UserSearchResult
Expand All @@ -41,13 +40,12 @@ import com.wire.kalium.persistence.dao.MetadataDAO
import com.wire.kalium.persistence.dao.QualifiedIDEntity
import com.wire.kalium.persistence.dao.UserDAO
import com.wire.kalium.persistence.dao.UserEntity
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.map
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json

internal interface SearchUserRepository {
Expand Down Expand Up @@ -96,8 +94,7 @@ internal class SearchUserRepositoryImpl(
private val userSearchAPiWrapper: UserSearchApiWrapper,
private val publicUserMapper: PublicUserMapper = MapperProvider.publicUserMapper(),
private val userMapper: UserMapper = MapperProvider.userMapper(),
private val userTypeMapper: DomainUserTypeMapper = MapperProvider.userTypeMapper(),
private val idMapper: IdMapper = MapperProvider.idMapper()
private val userTypeMapper: DomainUserTypeMapper = MapperProvider.userTypeMapper()
) : SearchUserRepository {

override suspend fun searchKnownUsersByNameOrHandleOrEmail(
Expand Down Expand Up @@ -181,12 +178,12 @@ internal class SearchUserRepositoryImpl(
// TODO: code duplication here for getting self user, the same is done inside
// UserRepository, what would be best ?
// creating SelfUserDao managing the UserEntity corresponding to SelfUser ?
@OptIn(FlowPreview::class)
@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun getSelfUser(): SelfUser {
return metadataDAO.valueByKeyFlow(UserDataSource.SELF_USER_ID_KEY)
.filterNotNull()
.flatMapMerge { encodedValue ->
val selfUserID: QualifiedIDEntity = Json.decodeFromString(encodedValue)
val selfUserID: QualifiedIDEntity = Json.decodeFromString(string = encodedValue)

userDAO.getUserByQualifiedID(selfUserID)
.filterNotNull()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,18 @@ class FederationEventReceiverImpl internal constructor(
}
}

conversationRepository.getConversationsWithMembersWithBothDomains(event.domain, selfUserId.domain)
conversationRepository.getOneOnOneConversationsWithFederatedMembers(event.domain)
.onSuccess { conversationsWithMembers ->
// mark users as defederated to hold conversation history in oneOnOne conversations
conversationsWithMembers.oneOnOne.forEach { (conversationId, userIds) ->
if (conversationId.domain == event.domain) {
handleFederationDeleteEvent(conversationId, event.domain)
userIds.filter { it.domain == event.domain }.forEach { userId ->
userRepository.defederateUser(userId)
}
} else {
userIds.filter { it.domain == event.domain }.forEach { userId ->
handleFederationDeleteEvent(conversationId, event.domain)
userRepository.defederateUser(userId)
}
}
conversationsWithMembers.forEach { (conversationId, userId) ->
handleFederationDeleteEvent(conversationId, event.domain)
userRepository.defederateUser(userId)
}
}

conversationsWithMembers.group.forEach { (conversationId, userIds) ->
conversationRepository.getGroupConversationsWithMembersWithBothDomains(event.domain, selfUserId.domain)
.onSuccess { conversationsWithMembers ->
conversationsWithMembers.forEach { (conversationId, userIds) ->
handleFederationDeleteEvent(conversationId, event.domain)
when (conversationId.domain) {
// remove defederated users from self domain conversations
Expand Down Expand Up @@ -145,9 +139,9 @@ class FederationEventReceiverImpl internal constructor(
val firstDomain = event.domains.first()
val secondDomain = event.domains.last()

conversationRepository.getConversationsWithMembersWithBothDomains(firstDomain, secondDomain)
conversationRepository.getGroupConversationsWithMembersWithBothDomains(firstDomain, secondDomain)
.onSuccess { conversationsWithMembers ->
conversationsWithMembers.group.forEach { (conversationId, userIds) ->
conversationsWithMembers.forEach { (conversationId, userIds) ->
handleFederationConnectionRemovedEvent(conversationId, event.domains)
when (conversationId.domain) {
// remove secondDomain users from firstDomain conversation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ import com.wire.kalium.persistence.dao.conversation.ConversationDAO
import com.wire.kalium.persistence.dao.conversation.ConversationEntity
import com.wire.kalium.persistence.dao.conversation.ConversationMetaDataDAO
import com.wire.kalium.persistence.dao.conversation.ConversationViewEntity
import com.wire.kalium.persistence.dao.member.ConversationsWithMembersEntity
import com.wire.kalium.persistence.dao.message.MessageDAO
import com.wire.kalium.persistence.dao.message.MessagePreviewEntity
import com.wire.kalium.persistence.dao.unread.ConversationUnreadEventEntity
Expand Down Expand Up @@ -1015,7 +1014,7 @@ class ConversationRepositoryTest {
// Given
val federatedDomain = "federated.com"
val selfDomain = "selfdomain.com"
val singleFederatedUserList = listOf(QualifiedIDEntity("fed_user", federatedDomain))
val federatedUserId = QualifiedIDEntity("fed_user", federatedDomain)
val federatedUserIdList = List(5) {
QualifiedIDEntity("fed_user_$it", federatedDomain)
}
Expand All @@ -1027,27 +1026,33 @@ class ConversationRepositoryTest {
val groupConversationId = QualifiedIDEntity("conversation_group", selfDomain)
val oneOnOneConversationId = QualifiedIDEntity("conversation_one_on_one", selfDomain)


val conversationsWithMembersEntity = ConversationsWithMembersEntity(
oneOnOne = mapOf(oneOnOneConversationId to singleFederatedUserList),
group = mapOf(groupConversationId to userIdList)
)

val (arrangement, conversationRepository) = Arrangement()
.withGetConversationWithUserIdsWithBothDomains(conversationsWithMembersEntity, eq(selfDomain), eq(federatedDomain))
.withGetGroupConversationWithUserIdsWithBothDomains(
mapOf(groupConversationId to userIdList),
eq(selfDomain),
eq(federatedDomain)
)
.withGetOneOnOneConversationWithFederatedUserId(
mapOf(oneOnOneConversationId to federatedUserId),
eq(federatedDomain)
)
.arrange()

// When
val result = conversationRepository.getConversationsWithMembersWithBothDomains(selfDomain, federatedDomain)
val groupConversationResult = conversationRepository.getGroupConversationsWithMembersWithBothDomains(selfDomain, federatedDomain)
val oneOnOneConversationResult = conversationRepository.getOneOnOneConversationsWithFederatedMembers(federatedDomain)

// Then
result.shouldSucceed {
assertEquals(userIdList.map { idEntity -> idEntity.toModel() }, it.group[groupConversationId.toModel()])
assertEquals(singleFederatedUserList.map { idEntity -> idEntity.toModel() }, it.oneOnOne[oneOnOneConversationId.toModel()])
groupConversationResult.shouldSucceed {
assertEquals(userIdList.map { idEntity -> idEntity.toModel() }, it[groupConversationId.toModel()])
}

oneOnOneConversationResult.shouldSucceed {
assertEquals(federatedUserId.toModel(), it[oneOnOneConversationId.toModel()])
}

verify(arrangement.memberDAO)
.suspendFunction(arrangement.memberDAO::getConversationWithUserIdsWithBothDomains)
.suspendFunction(arrangement.memberDAO::getGroupConversationWithUserIdsWithBothDomains)
.with(any(), any())
.wasInvoked(exactly = once)
}
Expand Down Expand Up @@ -1367,17 +1372,27 @@ class ConversationRepositoryTest {
.thenReturn(result)
}

fun withGetConversationWithUserIdsWithBothDomains(
result: ConversationsWithMembersEntity,
fun withGetGroupConversationWithUserIdsWithBothDomains(
result: Map<ConversationIDEntity, List<UserIDEntity>>,
firstDomain: Matcher<String> = any(),
secondDomain: Matcher<String> = any()
) = apply {
given(memberDAO)
.suspendFunction(memberDAO::getConversationWithUserIdsWithBothDomains)
.suspendFunction(memberDAO::getGroupConversationWithUserIdsWithBothDomains)
.whenInvokedWith(firstDomain, secondDomain)
.thenReturn(result)
}

fun withGetOneOnOneConversationWithFederatedUserId(
result: Map<ConversationIDEntity, UserIDEntity>,
domain: Matcher<String> = any()
) = apply {
given(memberDAO)
.suspendFunction(memberDAO::getOneOneConversationWithFederatedMembers)
.whenInvokedWith(domain)
.thenReturn(result)
}

fun arrange() = this to conversationRepository
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ class SearchUserRepositoryTest {
userSearchApiWrapper,
publicUserMapper,
userMapper,
domainUserTypeMapper,
idMapper
domainUserTypeMapper
)

given(domainUserTypeMapper).invocation { federated }.then { UserType.FEDERATED }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.wire.kalium.logic.data.event.Event
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.QualifiedID
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.member.ConversationsWithMembers
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.framework.TestConversationDetails
import com.wire.kalium.logic.framework.TestUser
Expand Down Expand Up @@ -73,12 +72,8 @@ class FederationEventReceiverTest {

val userIdWithBothDomainsList = defederatedUserIdList + selfUserIdList
val defederatedOneOnOneConversations = mapOf(
selfConversation.copy("1on1") to listOf(UserId("someDef", defederatedDomain), selfUserId),
defederatedConversation.copy("def1on1") to listOf(selfUserId, UserId("someDefTwo", defederatedDomain)),
)
val otherOneOnOneConversations = mapOf(
otherConversation.copy("other1on1") to listOf(selfUserId, UserId("someOther", otherDomain)),
selfConversation.copy("1on1self") to listOf(selfUserId, UserId("someOtherTwo", otherDomain)),
selfConversation.copy("1on1") to UserId("someDef", defederatedDomain),
defederatedConversation.copy("def1on1") to UserId("someDefTwo", defederatedDomain),
)

val defederatedGroupConversations = mapOf(
Expand All @@ -94,14 +89,8 @@ class FederationEventReceiverTest {
val (arrangement, useCase) = arrange {
withGetConnections(Either.Right(flowOf(connectionConversationList)))
withDeleteConnection(Either.Right(Unit))
withGetConversationsWithMembersWithBothDomains(
Either.Right(
ConversationsWithMembers(
oneOnOne = defederatedOneOnOneConversations + otherOneOnOneConversations,
group = defederatedGroupConversations
)
)
)
withGetGroupConversationsWithMembersWithBothDomains(Either.Right(defederatedGroupConversations))
withGetOneOnOneConversationsWithFederatedMember(Either.Right(defederatedOneOnOneConversations))
withDefederateUser(Either.Right(Unit))
withDeleteMembersByQualifiedID()
withPersistingMessage(Either.Right(Unit))
Expand Down Expand Up @@ -172,14 +161,7 @@ class FederationEventReceiverTest {
val systemMessageCount = defederatedGroupConversations.size * 2

val (arrangement, useCase) = arrange {
withGetConversationsWithMembersWithBothDomains(
Either.Right(
ConversationsWithMembers(
oneOnOne = mapOf(),
group = defederatedGroupConversations
)
)
)
withGetGroupConversationsWithMembersWithBothDomains(Either.Right(defederatedGroupConversations))
withDeleteMembersByQualifiedID()
withPersistingMessage(Either.Right(Unit))
}
Expand Down
Loading

0 comments on commit 856da30

Please sign in to comment.