Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Notification observation improvement [WPB-8776] #2753

Merged
merged 10 commits into from
May 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import com.wire.kalium.network.api.base.authenticated.message.MessagePriority
import com.wire.kalium.network.api.base.authenticated.message.QualifiedSendMessageResponse
import com.wire.kalium.network.exceptions.ProteusClientsChangedError
import com.wire.kalium.persistence.dao.conversation.ConversationEntity
import com.wire.kalium.persistence.dao.message.InsertMessageResult
import com.wire.kalium.persistence.dao.message.MessageDAO
import com.wire.kalium.persistence.dao.message.MessageEntity
import com.wire.kalium.persistence.dao.message.MessageEntityContent
Expand All @@ -62,7 +63,6 @@ import com.wire.kalium.util.DelicateKaliumApi
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapLatest
import kotlinx.datetime.Instant

@Suppress("TooManyFunctions")
Expand All @@ -79,7 +79,7 @@ internal interface MessageRepository {
message: Message.Standalone,
updateConversationReadDate: Boolean = false,
updateConversationModifiedDate: Boolean = false,
): Either<CoreFailure, Unit>
): Either<CoreFailure, InsertMessageResult>

suspend fun persistSystemMessageToAllConversations(
message: Message.System
Expand Down Expand Up @@ -118,7 +118,7 @@ internal interface MessageRepository {
conversationIdList: List<ConversationId>
): Either<StorageFailure, Map<ConversationId, Message>>

suspend fun getNotificationMessage(messageSizePerConversation: Int = 10): Either<CoreFailure, Flow<List<LocalNotification>>>
suspend fun getNotificationMessage(messageSizePerConversation: Int = 10): Either<CoreFailure, List<LocalNotification>>

suspend fun getMessagesByConversationIdAndVisibilityAfterDate(
conversationId: ConversationId,
Expand Down Expand Up @@ -301,21 +301,20 @@ internal class MessageDataSource internal constructor(
@OptIn(ExperimentalCoroutinesApi::class)
override suspend fun getNotificationMessage(
messageSizePerConversation: Int
): Either<CoreFailure, Flow<List<LocalNotification>>> = wrapStorageRequest {
messageDAO.getNotificationMessage().mapLatest { notificationEntities ->
notificationEntities.groupBy { it.conversationId }
.map { (conversationId, messages) ->
LocalNotification.Conversation(
// todo: needs some clean up!
id = conversationId.toModel(),
conversationName = messages.first().conversationName,
messages = messages.mapNotNull { message ->
messageMapper.fromMessageToLocalNotificationMessage(message)
},
isOneToOneConversation = messages.first().conversationType == ConversationEntity.Type.ONE_ON_ONE
)
}
}
): Either<CoreFailure, List<LocalNotification>> = wrapStorageRequest {
val notificationEntities = messageDAO.getNotificationMessage()
notificationEntities.groupBy { it.conversationId }
.map { (conversationId, messages) ->
LocalNotification.Conversation(
// todo: needs some clean up!
id = conversationId.toModel(),
conversationName = messages.first().conversationName,
messages = messages.mapNotNull { message ->
messageMapper.fromMessageToLocalNotificationMessage(message)
},
isOneToOneConversation = messages.first().conversationType == ConversationEntity.Type.ONE_ON_ONE
)
}
}

@DelicateKaliumApi(
Expand All @@ -326,7 +325,7 @@ internal class MessageDataSource internal constructor(
message: Message.Standalone,
updateConversationReadDate: Boolean,
updateConversationModifiedDate: Boolean,
): Either<CoreFailure, Unit> = wrapStorageRequest {
): Either<CoreFailure, InsertMessageResult> = wrapStorageRequest {
messageDAO.insertOrIgnoreMessage(
messageMapper.fromMessageToEntity(message),
updateConversationReadDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ package com.wire.kalium.logic.data.message
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.feature.message.NotificationEventsManager
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.persistence.dao.message.InsertMessageResult

/**
* Internal UseCase that should be used instead of MessageRepository.persistMessage(Message)
Expand All @@ -34,16 +38,24 @@ interface PersistMessageUseCase {

internal class PersistMessageUseCaseImpl(
private val messageRepository: MessageRepository,
private val selfUserId: UserId
private val selfUserId: UserId,
private val notificationEventsManager: NotificationEventsManager
) : PersistMessageUseCase {
override suspend operator fun invoke(message: Message.Standalone): Either<CoreFailure, Unit> {
val modifiedMessage = getExpectsReadConfirmationFromMessage(message)

val isSelfSender = message.isSelfTheSender(selfUserId)
return messageRepository.persistMessage(
message = modifiedMessage,
updateConversationReadDate = message.isSelfTheSender(selfUserId),
updateConversationReadDate = isSelfSender,
updateConversationModifiedDate = message.content.shouldUpdateConversationOrder()
)
).onSuccess {
val isConversationMuted = it == InsertMessageResult.INSERTED_INTO_MUTED_CONVERSATION

if (!isConversationMuted && !isSelfSender && message.content.shouldNotifyUser()) {
notificationEventsManager.scheduleRegularNotificationChecking()
}
Comment on lines +55 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥇

}.map { }
}

private fun Message.isSelfTheSender(selfUserId: UserId) = senderUserId == selfUserId
Expand Down Expand Up @@ -115,4 +127,58 @@ internal class PersistMessageUseCaseImpl(
is MessageContent.MemberChange.RemovedFromTeam -> false
is MessageContent.TeamMemberRemoved -> false
}

@Suppress("ComplexMethod")
private fun MessageContent.shouldNotifyUser(): Boolean =
when (this) {
is MessageContent.Text,
is MessageContent.Asset,
is MessageContent.Knock,
is MessageContent.RestrictedAsset,
is MessageContent.MissedCall,
is MessageContent.Location -> true

is MessageContent.MemberChange.Added,
is MessageContent.MemberChange.Removed,
is MessageContent.Calling,
is MessageContent.DeleteMessage,
is MessageContent.TextEdited,
is MessageContent.DeleteForMe,
is MessageContent.Unknown,
is MessageContent.Availability,
is MessageContent.FailedDecryption,
is MessageContent.Ignored,
is MessageContent.LastRead,
is MessageContent.Reaction,
is MessageContent.Cleared,
is MessageContent.ConversationRenamed,
is MessageContent.Receipt,
is MessageContent.ClientAction,
is MessageContent.CryptoSessionReset,
is MessageContent.NewConversationReceiptMode,
is MessageContent.ConversationReceiptModeChanged,
is MessageContent.HistoryLost,
is MessageContent.HistoryLostProtocolChanged,
is MessageContent.ConversationMessageTimerChanged,
is MessageContent.MemberChange.CreationAdded,
is MessageContent.MemberChange.FailedToAdd,
is MessageContent.ConversationCreated,
is MessageContent.MLSWrongEpochWarning,
MessageContent.ConversationDegradedMLS,
MessageContent.ConversationVerifiedMLS,
MessageContent.ConversationDegradedProteus,
MessageContent.ConversationVerifiedProteus,
is MessageContent.Composite,
is MessageContent.ButtonAction,
is MessageContent.ButtonActionConfirmation,
is MessageContent.MemberChange.FederationRemoved,
is MessageContent.FederationStopped.ConnectionRemoved,
is MessageContent.FederationStopped.Removed,
is MessageContent.ConversationProtocolChanged,
alexandreferris marked this conversation as resolved.
Show resolved Hide resolved
is MessageContent.ConversationProtocolChangedDuringACall,
is MessageContent.ConversationStartedUnverifiedWarning,
is MessageContent.LegalHold,
is MessageContent.MemberChange.RemovedFromTeam,
is MessageContent.TeamMemberRemoved -> false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ import com.wire.kalium.logic.feature.legalhold.UpdateSelfClientCapabilityToLegal
import com.wire.kalium.logic.feature.legalhold.UpdateSelfClientCapabilityToLegalHoldConsentUseCaseImpl
import com.wire.kalium.logic.feature.message.AddSystemMessageToAllConversationsUseCase
import com.wire.kalium.logic.feature.message.AddSystemMessageToAllConversationsUseCaseImpl
import com.wire.kalium.logic.feature.message.EphemeralEventsNotificationManagerImpl
import com.wire.kalium.logic.feature.message.NotificationEventsManagerImpl
import com.wire.kalium.logic.feature.message.MessageScope
import com.wire.kalium.logic.feature.message.MessageSendingScheduler
import com.wire.kalium.logic.feature.message.PendingProposalScheduler
Expand Down Expand Up @@ -847,7 +847,7 @@ class UserSessionScope internal constructor(
)

val persistMessage: PersistMessageUseCase
get() = PersistMessageUseCaseImpl(messageRepository, userId)
get() = PersistMessageUseCaseImpl(messageRepository, userId, NotificationEventsManagerImpl)

private val addSystemMessageToAllConversationsUseCase: AddSystemMessageToAllConversationsUseCase
get() = AddSystemMessageToAllConversationsUseCaseImpl(messageRepository, userId)
Expand Down Expand Up @@ -1299,20 +1299,20 @@ class UserSessionScope internal constructor(
callManager,
persistMessage,
persistReaction,
MessageTextEditHandlerImpl(messageRepository, EphemeralEventsNotificationManagerImpl),
MessageTextEditHandlerImpl(messageRepository, NotificationEventsManagerImpl),
LastReadContentHandlerImpl(
conversationRepository,
userId,
isMessageSentInSelfConversation,
EphemeralEventsNotificationManagerImpl
NotificationEventsManagerImpl
),
ClearConversationContentHandlerImpl(
conversationRepository,
userId,
isMessageSentInSelfConversation,
),
DeleteForMeHandlerImpl(messageRepository, isMessageSentInSelfConversation),
DeleteMessageHandlerImpl(messageRepository, assetRepository, EphemeralEventsNotificationManagerImpl, userId),
DeleteMessageHandlerImpl(messageRepository, assetRepository, NotificationEventsManagerImpl, userId),
messageEncoder,
receiptMessageHandler,
buttonActionConfirmationHandler,
Expand Down Expand Up @@ -1350,7 +1350,9 @@ class UserSessionScope internal constructor(
)
private val deletedConversationHandler: DeletedConversationEventHandler
get() = DeletedConversationEventHandlerImpl(
userRepository, conversationRepository, EphemeralEventsNotificationManagerImpl
userRepository,
conversationRepository,
NotificationEventsManagerImpl
)
private val memberJoinHandler: MemberJoinEventHandler
get() = MemberJoinEventHandlerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ import com.wire.kalium.logic.data.notification.LocalNotificationMessageMapper
import com.wire.kalium.logic.data.sync.IncrementalSyncRepository
import com.wire.kalium.logic.data.sync.IncrementalSyncStatus
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.onlyRight
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onStart

/**
* Get notifications for the current user
Expand All @@ -56,7 +58,7 @@ interface GetNotificationsUseCase {
internal class GetNotificationsUseCaseImpl internal constructor(
private val connectionRepository: ConnectionRepository,
private val messageRepository: MessageRepository,
private val deleteConversationNotificationsManager: EphemeralEventsNotificationManager,
private val notificationEventsManager: NotificationEventsManager,
private val incrementalSyncRepository: IncrementalSyncRepository,
private val localNotificationMessageMapper: LocalNotificationMessageMapper = MapperProvider.localNotificationMessageMapper()
) : GetNotificationsUseCase {
Expand All @@ -66,25 +68,31 @@ internal class GetNotificationsUseCaseImpl internal constructor(
override suspend operator fun invoke(): Flow<List<LocalNotification>> {
return incrementalSyncRepository.incrementalSyncState
.map { it != IncrementalSyncStatus.FetchingPendingEvents }
.distinctUntilChanged()
.flatMapLatest { isLive ->
if (isLive) {
merge(
messageRepository.getNotificationMessage().fold({ flowOf() }, { it }),
observeRegularNotifications(),
observeConnectionRequests(),
observeEphemeralNotifications()
)
} else {
observeEphemeralNotifications()
}.map { list ->
list.filter { it !is LocalNotification.Conversation || it.messages.isNotEmpty() }
}
.map { list ->
list.filter { it !is LocalNotification.Conversation || it.messages.isNotEmpty() }
}
}
.filter { it.isNotEmpty() }
}

private suspend fun observeRegularNotifications(): Flow<List<LocalNotification>> =
notificationEventsManager.observeRegularNotificationsChecking().debounce(NOTIFICATION_DEBOUNCE_MS)
.map { messageRepository.getNotificationMessage() }
.onStart { emit(messageRepository.getNotificationMessage()) }
.onlyRight()

private suspend fun observeEphemeralNotifications(): Flow<List<LocalNotification>> =
deleteConversationNotificationsManager.observeEphemeralNotifications().map { listOf(it) }
notificationEventsManager.observeEphemeralNotifications().map { listOf(it) }

private suspend fun observeConnectionRequests(): Flow<List<LocalNotification>> {
return connectionRepository.observeConnectionRequestsForNotification()
Expand All @@ -94,4 +102,8 @@ internal class GetNotificationsUseCaseImpl internal constructor(
.map { localNotificationMessageMapper.fromConnectionToLocalNotificationConversation(it) }
}
}

companion object {
private const val NOTIFICATION_DEBOUNCE_MS = 50L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class MessageScope internal constructor(
)

val persistMessage: PersistMessageUseCase
get() = PersistMessageUseCaseImpl(messageRepository, selfUserId)
get() = PersistMessageUseCaseImpl(messageRepository, selfUserId, NotificationEventsManagerImpl)

val sendTextMessage: SendTextMessageUseCase
get() = SendTextMessageUseCase(
Expand Down Expand Up @@ -346,7 +346,7 @@ class MessageScope internal constructor(
connectionRepository = connectionRepository,
messageRepository = messageRepository,
incrementalSyncRepository = incrementalSyncRepository,
deleteConversationNotificationsManager = EphemeralEventsNotificationManagerImpl
notificationEventsManager = NotificationEventsManagerImpl
)

internal val sendConfirmation: SendConfirmationUseCase
Expand Down
Loading
Loading