Skip to content

Commit

Permalink
Merge branch 'develop' into feat/parse-and-store-locationmessags
Browse files Browse the repository at this point in the history
  • Loading branch information
yamilmedina authored Dec 6, 2023
2 parents 47c8f0a + a9f9834 commit 5a9eb0e
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ sealed interface CoreFailure {
data object MissingClientRegistration : CoreFailure

/**
* A user has no key packages available which prevents him/her from being added
* Key packages requested not available which prevents them from being added
* to an existing or new conversation.
*/
data class NoKeyPackagesAvailable(val userId: UserId) : CoreFailure
data class NoKeyPackagesAvailable(val failedUserIds: Set<UserId>) : CoreFailure

/**
* It's not allowed to run the application with development API enabled when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.data.event.EventMapper
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.GroupID
import com.wire.kalium.logic.data.id.SelfTeamIdProvider
import com.wire.kalium.logic.data.id.toApi
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.data.service.ServiceId
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.data.id.SelfTeamIdProvider
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.map
Expand Down Expand Up @@ -185,15 +185,95 @@ internal class ConversationGroupRepositoryImpl(
is ConversationEntity.ProtocolInfo.Mixed ->
tryAddMembersToCloudAndStorage(userIdList, conversationId)
.flatMap {
// best effort approach for migrated conversations, no retries
mlsConversationRepository.addMemberToMLSGroup(GroupID(protocol.groupId), userIdList)
}

is ConversationEntity.ProtocolInfo.MLS -> {
mlsConversationRepository.addMemberToMLSGroup(GroupID(protocol.groupId), userIdList)
tryAddMembersToMLSGroup(conversationId, protocol.groupId, userIdList)
}
}
}

/**
* Handle the error cases and retry for claimPackages offline and out of packages.
* Handle error case and retry for sendingCommit unreachable.
*/
private suspend fun tryAddMembersToMLSGroup(
conversationId: ConversationId,
groupId: String,
userIdList: List<UserId>,
failedUsersList: Set<UserId> = emptySet(),
remainingAttempts: Int = 2
): Either<CoreFailure, Unit> {
return when (val addingMemberResult = mlsConversationRepository.addMemberToMLSGroup(GroupID(groupId), userIdList)) {
is Either.Right -> handleMLSMembersAdded(conversationId, userIdList, failedUsersList)
is Either.Left -> {
addingMemberResult.value.handleMLSMembersFailed(
conversationId = conversationId,
groupId = groupId,
userIdList = userIdList,
failedUsersList = failedUsersList,
remainingAttempts = remainingAttempts,
)
}
}
}

private suspend fun CoreFailure.handleMLSMembersFailed(
conversationId: ConversationId,
groupId: String,
userIdList: List<UserId>,
failedUsersList: Set<UserId>,
remainingAttempts: Int,
): Either<CoreFailure, Unit> {
return when {
// claiming key packages offline or out of packages
this is CoreFailure.NoKeyPackagesAvailable && remainingAttempts > 0 -> {
val (validUsers, failedUsers) = userIdList.partition { !this.failedUserIds.contains(it) }
tryAddMembersToMLSGroup(
conversationId = conversationId,
groupId = groupId,
userIdList = validUsers,
failedUsersList = (failedUsersList + failedUsers).toSet(),
remainingAttempts = remainingAttempts - 1
)
}

// sending commit unreachable
this is NetworkFailure.FederatedBackendFailure.RetryableFailure && remainingAttempts > 0 -> {
val (validUsers, failedUsers) = extractValidUsersForRetryableFederationError(userIdList, this)
tryAddMembersToMLSGroup(
conversationId = conversationId,
groupId = groupId,
userIdList = validUsers,
failedUsersList = (failedUsersList + failedUsers).toSet(),
remainingAttempts = remainingAttempts - 1
)
}

else -> {
newGroupConversationSystemMessagesCreator.value.conversationFailedToAddMembers(
conversationId, (failedUsersList + userIdList)
).flatMap {
Either.Left(this)
}
}
}
}

private suspend fun handleMLSMembersAdded(
conversationId: ConversationId,
userIdList: List<UserId>,
failedUsersList: Set<UserId>
): Either<CoreFailure, Unit> {
return newGroupConversationSystemMessagesCreator.value.conversationResolvedMembersAddedAndFailed(
conversationId.toDao(), userIdList, failedUsersList.toList()
).flatMap {
Either.Right(Unit)
}
}

override suspend fun addService(serviceId: ServiceId, conversationId: ConversationId): Either<CoreFailure, Unit> =
wrapStorageRequest { conversationDAO.getConversationProtocolInfo(conversationId.toDao()) }
.flatMap { protocol ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.wire.kalium.logic.data.conversation

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.id.IdMapper
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.functional.Either
Expand Down Expand Up @@ -59,7 +60,7 @@ internal class NewConversationMembersRepositoryImpl(
}.flatMap {
newGroupConversationSystemMessagesCreator.value.conversationResolvedMembersAddedAndFailed(
conversationId,
conversationResponse,
conversationResponse.members.otherMembers.map { it.id.toModel() },
failedUsersList
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.PersistMessageUseCase
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.network.api.base.authenticated.conversation.ConversationResponse
Expand All @@ -47,7 +46,7 @@ internal interface NewGroupConversationSystemMessagesCreator {
suspend fun conversationReadReceiptStatus(conversation: ConversationResponse): Either<CoreFailure, Unit>
suspend fun conversationResolvedMembersAddedAndFailed(
conversationId: ConversationIDEntity,
conversationResponse: ConversationResponse,
validUsers: List<UserId>,
failedUsersList: List<UserId> = emptyList()
): Either<CoreFailure, Unit>

Expand All @@ -64,7 +63,6 @@ internal class NewGroupConversationSystemMessagesCreatorImpl(
private val selfTeamIdProvider: SelfTeamIdProvider,
private val qualifiedIdMapper: QualifiedIdMapper,
private val selfUserId: UserId,
private val memberMapper: MemberMapper = MapperProvider.memberMapper()
) : NewGroupConversationSystemMessagesCreator {

override suspend fun conversationStarted(conversation: ConversationEntity) = run {
Expand Down Expand Up @@ -143,16 +141,14 @@ internal class NewGroupConversationSystemMessagesCreatorImpl(

override suspend fun conversationResolvedMembersAddedAndFailed(
conversationId: ConversationIDEntity,
conversationResponse: ConversationResponse,
validUsers: List<UserId>,
failedUsersList: List<UserId>
): Either<CoreFailure, Unit> = run {
if (conversationResponse.members.otherMembers.isNotEmpty()) {
if (validUsers.isNotEmpty()) {
persistMessage(
Message.System(
id = uuid4().toString(),
content = MessageContent.MemberChange.CreationAdded(
memberMapper.fromApiModel(conversationResponse.members).otherMembers.map { it.id }
),
content = MessageContent.MemberChange.CreationAdded(validUsers.toList()),
conversationId = conversationId.toModel(),
date = DateTimeUtil.currentIsoDateTimeString(),
senderUserId = selfUserId,
Expand All @@ -169,18 +165,22 @@ internal class NewGroupConversationSystemMessagesCreatorImpl(
override suspend fun conversationFailedToAddMembers(
conversationId: ConversationId,
userIdList: Set<UserId>
): Either<CoreFailure, Unit> {
val messageFailedToAddMembers = Message.System(
uuid4().toString(),
MessageContent.MemberChange.FailedToAdd(userIdList.toList()),
conversationId,
DateTimeUtil.currentIsoDateTimeString(),
selfUserId,
Message.Status.Sent,
Message.Visibility.VISIBLE,
expirationData = null
)
return persistMessage(messageFailedToAddMembers)
): Either<CoreFailure, Unit> = run {
if (userIdList.isNotEmpty()) {
persistMessage(
Message.System(
uuid4().toString(),
MessageContent.MemberChange.FailedToAdd(userIdList.toList()),
conversationId,
DateTimeUtil.currentIsoDateTimeString(),
selfUserId,
Message.Status.Sent,
Message.Visibility.VISIBLE,
expirationData = null
)
)
}
Either.Right(Unit)
}

private suspend fun createFailedToAddSystemMessage(conversationId: ConversationIDEntity, failedUsersList: List<UserId>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.data.client.MLSClientProvider
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.id.IdMapper
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.id.toApi
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.wrapApiRequest
import com.wire.kalium.logic.wrapMLSRequest
import com.wire.kalium.network.api.base.authenticated.keypackage.KeyPackageApi
Expand Down Expand Up @@ -58,25 +56,28 @@ class KeyPackageDataSource(
private val keyPackageApi: KeyPackageApi,
private val mlsClientProvider: MLSClientProvider,
private val selfUserId: UserId,
private val idMapper: IdMapper = MapperProvider.idMapper(),
) : KeyPackageRepository {

override suspend fun claimKeyPackages(userIds: List<UserId>): Either<CoreFailure, List<KeyPackageDTO>> =
currentClientIdProvider().flatMap { selfClientId ->
userIds.map { userId ->
val failedUsers = mutableSetOf<UserId>()
val claimedKeyPackages = mutableListOf<KeyPackageDTO>()
userIds.forEach { userId ->
wrapApiRequest {
keyPackageApi.claimKeyPackages(
KeyPackageApi.Param.SkipOwnClient(userId.toApi(), selfClientId.value)
)
}.flatMap {
keyPackageApi.claimKeyPackages(KeyPackageApi.Param.SkipOwnClient(userId.toApi(), selfClientId.value))
}.fold({ failedUsers.add(userId) }) {
if (it.keyPackages.isEmpty() && userId != selfUserId) {
Either.Left(CoreFailure.NoKeyPackagesAvailable(userId))
failedUsers.add(userId)
} else {
Either.Right(it.keyPackages)
claimedKeyPackages.addAll(it.keyPackages)
}
}
}.foldToEitherWhileRight(emptyList()) { item, acc ->
item.flatMap { Either.Right(acc + it) }
}

if (failedUsers.isNotEmpty()) {
Either.Left(CoreFailure.NoKeyPackagesAvailable(failedUsers))
} else {
Either.Right(claimedKeyPackages)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ internal class NewConversationEventHandlerImpl(
newGroupConversationSystemMessagesCreator.conversationStarted(event.senderUserId, event.conversation)
newGroupConversationSystemMessagesCreator.conversationResolvedMembersAddedAndFailed(
event.conversationId.toDao(),
event.conversation
event.conversation.members.otherMembers.map { it.id.toModel() }
)
newGroupConversationSystemMessagesCreator.conversationReadReceiptStatus(event.conversation)
newGroupConversationSystemMessagesCreator.conversationStartedUnverifiedWarning(event.conversation.id.toModel())
Expand Down
Loading

0 comments on commit 5a9eb0e

Please sign in to comment.