Skip to content

Commit

Permalink
refactor(MLS): allow returning values when generating commits. (#2609)
Browse files Browse the repository at this point in the history
This should cause no changes in behaviour at all, but will allow customisation of some MLS pipelines in the near future.
This refactors the `retryOnCommitFailure` function (renamed to `produceAndSendCommitWithRetryAndResult`), so it allows returning a produced value if the commit succeeds.
The function now takes the responsibility of sending the generated commit, instead of relying on the callers to do so.
It also conveniently provides a MLSClient for the callers, as this is the way to obtain a `CommitBundle`.

(cherry picked from commit 390b880)
  • Loading branch information
vitorhugods committed Mar 13, 2024
1 parent 2b91cb1 commit ded4598
Showing 1 changed file with 187 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.wire.kalium.cryptography.CryptoCertificateStatus
import com.wire.kalium.cryptography.CryptoQualifiedClientId
import com.wire.kalium.cryptography.E2EIClient
import com.wire.kalium.cryptography.Ed22519Key
import com.wire.kalium.cryptography.MLSClient
import com.wire.kalium.cryptography.WireIdentity
import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.logic.CoreFailure
Expand Down Expand Up @@ -309,20 +310,16 @@ internal class MLSConversationDataSource(
}

override suspend fun updateKeyingMaterial(groupID: GroupID): Either<CoreFailure, Unit> = withContext(serialDispatcher) {
retryOnCommitFailure(groupID) {
mlsClientProvider.getMLSClient().flatMap { mlsClient ->
wrapMLSRequest {
mlsClient.updateKeyingMaterial(idMapper.toCryptoModel(groupID))
}.flatMap { commitBundle ->
sendCommitBundle(groupID, commitBundle)
}.flatMap {
wrapStorageRequest {
conversationDAO.updateKeyingMaterial(
idMapper.toCryptoModel(groupID),
DateTimeUtil.currentInstant()
)
}
}
produceAndSendCommitWithRetry(groupID) {
wrapMLSRequest {
updateKeyingMaterial(idMapper.toCryptoModel(groupID))
}
}.flatMap {
wrapStorageRequest {
conversationDAO.updateKeyingMaterial(
idMapper.toCryptoModel(groupID),
DateTimeUtil.currentInstant()
)
}
}
}
Expand Down Expand Up @@ -375,28 +372,46 @@ internal class MLSConversationDataSource(
}
}

override suspend fun commitPendingProposals(groupID: GroupID): Either<CoreFailure, Unit> = withContext(serialDispatcher) {
retryOnCommitFailure(groupID) {
internalCommitPendingProposals(groupID)
override suspend fun commitPendingProposals(
groupID: GroupID
): Either<CoreFailure, Unit> = withContext(serialDispatcher) {
produceAndSendCommitWithRetry(groupID) {
getPendingCommitBundle(groupID)
}.flatMap {
wrapStorageRequest {
conversationDAO.clearProposalTimer(idMapper.toCryptoModel(groupID))
}
}
}

private suspend fun internalCommitPendingProposals(groupID: GroupID): Either<CoreFailure, Unit> =
mlsClientProvider.getMLSClient()
.flatMap { mlsClient ->
wrapMLSRequest {
mlsClient.commitPendingProposals(idMapper.toCryptoModel(groupID))
}.flatMap { commitBundle ->
commitBundle?.crlNewDistributionPoints?.let {
checkRevocationList(it)
}
commitBundle?.let { sendCommitBundle(groupID, it) } ?: Either.Right(Unit)
}.flatMap {
wrapStorageRequest {
conversationDAO.clearProposalTimer(idMapper.toCryptoModel(groupID))
}
private suspend fun commitPendingProposalsWithoutRetry(
groupID: GroupID
): Either<CoreFailure, Unit> = withContext(serialDispatcher) {
getPendingCommitBundle(groupID).flatMap {
if (it != null) {
sendCommitBundle(groupID, it)
} else {
Either.Right(Unit)
}
}.flatMap {
wrapStorageRequest {
conversationDAO.clearProposalTimer(idMapper.toCryptoModel(groupID))
}
}
}

private suspend fun getPendingCommitBundle(
groupID: GroupID
): Either<CoreFailure, CommitBundle?> =
mlsClientProvider.getMLSClient().flatMap { mlsClient ->
wrapMLSRequest {
mlsClient.commitPendingProposals(idMapper.toCryptoModel(groupID))
}.onSuccess { commitBundle ->
commitBundle?.crlNewDistributionPoints?.let {
checkRevocationList(it)
}
}
}

override suspend fun setProposalTimer(timer: ProposalTimer, inMemory: Boolean) {
if (inMemory) {
Expand All @@ -421,32 +436,29 @@ internal class MLSConversationDataSource(
retryOnStaleMessage: Boolean
): Either<CoreFailure, Unit> = withContext(serialDispatcher) {
commitPendingProposals(groupID).flatMap {
retryOnCommitFailure(groupID, retryOnStaleMessage = retryOnStaleMessage) {
keyPackageRepository.claimKeyPackages(userIdList).flatMap { keyPackageResult ->
val keyPackages = keyPackageResult.successfullyFetchedKeyPackages
val usersMissingKeyPackages = keyPackageResult.usersWithoutKeyPackagesAvailable
if (usersMissingKeyPackages.isNotEmpty()) {
return@retryOnCommitFailure Either.Left(CoreFailure.MissingKeyPackages(usersMissingKeyPackages))
produceAndSendCommitWithRetry(groupID, retryOnStaleMessage = retryOnStaleMessage) {
keyPackageRepository.claimKeyPackages(userIdList).flatMap { result ->
if (result.usersWithoutKeyPackagesAvailable.isNotEmpty()) {
Either.Left(CoreFailure.MissingKeyPackages(result.usersWithoutKeyPackagesAvailable))
} else {
Either.Right(result)
}
mlsClientProvider.getMLSClient().flatMap { mlsClient ->
val clientKeyPackageList = keyPackages.map { it.keyPackage.decodeBase64Bytes() }

wrapMLSRequest {
if (userIdList.isEmpty()) {
// We are creating a group with only our self client which technically
// doesn't need be added with a commit, but our backend API requires one,
// so we create a commit by updating our key material.
mlsClient.updateKeyingMaterial(idMapper.toCryptoModel(groupID))
} else {
mlsClient.addMember(idMapper.toCryptoModel(groupID), clientKeyPackageList)
}
}.flatMap { commitBundle ->
commitBundle?.crlNewDistributionPoints?.let {
checkRevocationList(it)
}
commitBundle?.let {
sendCommitBundle(groupID, it)
} ?: Either.Right(Unit)
}.flatMap { result ->
val keyPackages = result.successfullyFetchedKeyPackages
val clientKeyPackageList = keyPackages.map { it.keyPackage.decodeBase64Bytes() }

wrapMLSRequest {
if (userIdList.isEmpty()) {
// We are creating a group with only our self client which technically
// doesn't need be added with a commit, but our backend API requires one,
// so we create a commit by updating our key material.
updateKeyingMaterial(idMapper.toCryptoModel(groupID))
} else {
addMember(idMapper.toCryptoModel(groupID), clientKeyPackageList)
}
}.onSuccess { commitBundle ->
commitBundle?.crlNewDistributionPoints?.let { revocationList ->
checkRevocationList(revocationList)
}
}
}
Expand All @@ -459,7 +471,7 @@ internal class MLSConversationDataSource(
userIdList: List<UserId>
): Either<CoreFailure, Unit> = withContext(serialDispatcher) {
commitPendingProposals(groupID).flatMap {
retryOnCommitFailure(groupID) {
produceAndSendCommitWithRetry(groupID) {
wrapApiRequest { clientApi.listClientsOfUsers(userIdList.map { it.toApi() }) }.map { userClientsList ->
val usersCryptoQualifiedClientIDs = userClientsList.flatMap { userClients ->
userClients.value.map { userClient ->
Expand All @@ -469,12 +481,8 @@ internal class MLSConversationDataSource(
)
}
}
return@retryOnCommitFailure mlsClientProvider.getMLSClient().flatMap { mlsClient ->
wrapMLSRequest {
mlsClient.removeMember(idMapper.toCryptoModel(groupID), usersCryptoQualifiedClientIDs)
}.flatMap {
sendCommitBundle(groupID, it)
}
return@produceAndSendCommitWithRetry wrapMLSRequest {
removeMember(idMapper.toCryptoModel(groupID), usersCryptoQualifiedClientIDs)
}
}
}
Expand All @@ -486,19 +494,15 @@ internal class MLSConversationDataSource(
clientIdList: List<QualifiedClientID>
): Either<CoreFailure, Unit> = withContext(serialDispatcher) {
commitPendingProposals(groupID).flatMap {
retryOnCommitFailure(groupID, retryOnClientMismatch = false) {
produceAndSendCommitWithRetry(groupID, retryOnClientMismatch = false) {
val qualifiedClientIDs = clientIdList.map { userClient ->
CryptoQualifiedClientId(
userClient.clientId.value,
userClient.userId.toCrypto()
)
}
return@retryOnCommitFailure mlsClientProvider.getMLSClient().flatMap { mlsClient ->
wrapMLSRequest {
mlsClient.removeMember(groupID.toCrypto(), qualifiedClientIDs)
}.flatMap {
sendCommitBundle(groupID, it)
}
return@produceAndSendCommitWithRetry wrapMLSRequest {
removeMember(groupID.toCrypto(), qualifiedClientIDs)
}
}
}
Expand Down Expand Up @@ -654,76 +658,147 @@ internal class MLSConversationDataSource(
}
}

private suspend fun retryOnCommitFailure(
/**
* Takes an operation that generates a commit, performs it and sends the commit to remote.
* For convenience, it provides a MLSClient scope within the [operation].
* In case of failure, will follow [CoreFailure.getStrategy], retrying the [operation], retrying the sending of the commit, or just
* aborting.
* If the [operation] produces a null commit, will skip the sending of the commit and just return success.
*
* @param groupID The ID of the group to send the commit for.
* @param retryOnClientMismatch Whether to retry if a client mismatch occurs. Default is true.
* @param retryOnStaleMessage Whether to retry if a stale message occurs. Default is true.
* @param operation The operation to perform, which should return an [Either] containing a [CoreFailure] or [CommitBundle].
* @return An [Either] containing a [CoreFailure] or [Unit], indicating whether the operation was successful.
* @see produceAndSendCommitWithRetryAndResult
*/
private suspend fun produceAndSendCommitWithRetry(
groupID: GroupID,
retryOnClientMismatch: Boolean = true,
retryOnStaleMessage: Boolean = true,
operation: suspend () -> Either<CoreFailure, Unit>
) =
operation()
.flatMapLeft {
operation: suspend MLSClient.() -> Either<CoreFailure, CommitBundle?>
): Either<CoreFailure, Unit> = mlsClientProvider.getMLSClient().flatMap { mlsClient ->
produceAndSendCommitWithRetryAndResult(
groupID = groupID,
retryOnClientMismatch = retryOnClientMismatch,
retryOnStaleMessage = retryOnStaleMessage
) {
mlsClient.operation().map { CommitOperationResult(it, Unit) }
}
}.map { Unit }

/**
* Takes an operation that generates a commit, performs it and sends the commit to remote.
* This allows returning a value with the produced commit.
* For convenience, it provides a MLSClient scope within the [operation].
* In case of success, will return the latest result of the [operation], including the last result obtained during a retry.
* In case of failure, will follow [CoreFailure.getStrategy], retrying the [operation], retrying the sending of the commit, or just
* aborting.
* If the [operation] produces a null commit, will skip the sending of the commit and just return success.
*
* @param groupID The ID of the group to send the commit for.
* @param retryOnClientMismatch Whether to retry if a client mismatch occurs. Default is true.
* @param retryOnStaleMessage Whether to retry if a stale message occurs. Default is true.
* @param operation The operation to perform, which should return an [Either] containing a [CoreFailure] or [CommitOperationResult].
* @return An [Either] containing a [CoreFailure] or [Unit], indicating whether the operation was successful.
* @see produceAndSendCommitWithRetry
*/
private suspend fun <T> produceAndSendCommitWithRetryAndResult(
groupID: GroupID,
retryOnClientMismatch: Boolean = true,
retryOnStaleMessage: Boolean = true,
operation: suspend MLSClient.() -> Either<CoreFailure, CommitOperationResult<T>>
): Either<CoreFailure, T> = mlsClientProvider.getMLSClient().flatMap { mlsClient ->
mlsClient.operation().fold({
kaliumLogger.w("Failure to produce commit. Aborting retry.")
// Failure to generate commit. Nothing to retry
Either.Left(it)
}, { operationResult ->
// Try sending the produced commit (or skip if null), and return the produced result
val commitBundle = operationResult.commitBundle ?: return@fold Either.Right(operationResult.result)
sendCommitBundle(groupID, commitBundle).map {
operationResult
}.flatMapLeft { failure ->
handleCommitFailure(
failure = it,
failure = failure,
groupID = groupID,
currentOperationResult = operationResult,
remainingAttempts = 2,
retryOnClientMismatch = retryOnClientMismatch,
retryOnStaleMessage = retryOnStaleMessage,
retryOperation = operation
)
}
) { mlsClient.operation() }
}.map { it.result }
})
}

private suspend fun handleCommitFailure(
private suspend fun <T> handleCommitFailure(
failure: CoreFailure,
groupID: GroupID,
currentOperationResult: CommitOperationResult<T>,
remainingAttempts: Int,
retryOnClientMismatch: Boolean,
retryOnStaleMessage: Boolean,
retryOperation: suspend () -> Either<CoreFailure, Unit>
): Either<CoreFailure, Unit> {
return when (
retryOperation: suspend () -> Either<CoreFailure, CommitOperationResult<T>>
): Either<CoreFailure, CommitOperationResult<T>> = // Handle error in case the sending fails
when (
failure.getStrategy(
remainingAttempts = remainingAttempts,
retryOnClientMismatch = retryOnClientMismatch,
retryOnStaleMessage = retryOnStaleMessage
)
) {
CommitStrategy.KEEP_AND_RETRY -> keepCommitAndRetry(groupID)
CommitStrategy.DISCARD_AND_RETRY -> discardCommitAndRetry(groupID, retryOperation)
CommitStrategy.ABORT -> return discardCommit(groupID).flatMap { Either.Left(failure) }
}.flatMapLeft {
handleCommitFailure(
failure = it,
groupID = groupID,
remainingAttempts = remainingAttempts - 1,
retryOnClientMismatch = retryOnClientMismatch,
retryOnStaleMessage = retryOnStaleMessage,
retryOperation = retryOperation
)
CommitStrategy.KEEP_AND_RETRY -> {
// If we keep the commit, and resending it works, return the previous result
keepCommitAndRetry(groupID).map { currentOperationResult }.flatMapLeft {
handleCommitFailure(
failure = it,
groupID = groupID,
currentOperationResult = currentOperationResult,
remainingAttempts = remainingAttempts - 1,
retryOnClientMismatch = retryOnClientMismatch,
retryOnStaleMessage = retryOnStaleMessage,
retryOperation = retryOperation
)
}
}

CommitStrategy.DISCARD_AND_RETRY -> {
// In case of DISCARD AND RETRY, discard pending commits and retry the operation, sending the new commit
kaliumLogger.w("Discarding failed commit and retrying operation")
discardCommitForRetrying(groupID).flatMap { retryOperation() }.flatMap { newResult ->
val commitBundle = newResult.commitBundle ?: return@flatMap Either.Right(newResult)
sendCommitBundle(groupID, commitBundle).map { newResult }.flatMapLeft {
handleCommitFailure(
failure = it,
groupID = groupID,
currentOperationResult = newResult,
remainingAttempts = remainingAttempts - 1,
retryOnClientMismatch = retryOnClientMismatch,
retryOnStaleMessage = retryOnStaleMessage,
retryOperation = retryOperation
)
}
}
}

CommitStrategy.ABORT -> discardCommit(groupID).flatMap { Either.Left(failure) }
}
}

private suspend fun keepCommitAndRetry(groupID: GroupID): Either<CoreFailure, Unit> {
kaliumLogger.w("Migrating failed commit to new epoch and re-trying.")

return syncManager.waitUntilLiveOrFailure().flatMap {
internalCommitPendingProposals(groupID)
commitPendingProposalsWithoutRetry(groupID)
}
}

private suspend fun discardCommitAndRetry(
private suspend fun discardCommitForRetrying(
groupID: GroupID,
operation: suspend () -> Either<CoreFailure, Unit>
): Either<CoreFailure, Unit> {
kaliumLogger.w("Discarding failed commit and retry by re-generating the commit.")

return mlsClientProvider.getMLSClient().flatMap { mlsClient ->
wrapMLSRequest {
mlsClient.clearPendingCommit(idMapper.toCryptoModel(groupID))
}.flatMap {
syncManager.waitUntilLiveOrFailure().flatMap {
operation()
}
}
): Either<CoreFailure, Unit> = mlsClientProvider.getMLSClient().flatMap { mlsClient ->
wrapMLSRequest {
mlsClient.clearPendingCommit(idMapper.toCryptoModel(groupID))
}.flatMap {
syncManager.waitUntilLiveOrFailure()
}
}

Expand All @@ -750,4 +825,6 @@ internal class MLSConversationDataSource(
}
}
}

private data class CommitOperationResult<T>(val commitBundle: CommitBundle?, val result: T)
}

0 comments on commit ded4598

Please sign in to comment.