Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Notification observation improvement [WPB-8776] #2753

Merged
merged 10 commits into from
May 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import com.wire.kalium.network.api.base.authenticated.message.MessagePriority
import com.wire.kalium.network.api.base.authenticated.message.QualifiedSendMessageResponse
import com.wire.kalium.network.exceptions.ProteusClientsChangedError
import com.wire.kalium.persistence.dao.conversation.ConversationEntity
import com.wire.kalium.persistence.dao.message.InsertMessageResult
import com.wire.kalium.persistence.dao.message.MessageDAO
import com.wire.kalium.persistence.dao.message.MessageEntity
import com.wire.kalium.persistence.dao.message.MessageEntityContent
Expand All @@ -62,7 +63,6 @@ import com.wire.kalium.util.DelicateKaliumApi
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapLatest
import kotlinx.datetime.Instant

@Suppress("TooManyFunctions")
Expand All @@ -79,7 +79,7 @@ internal interface MessageRepository {
message: Message.Standalone,
updateConversationReadDate: Boolean = false,
updateConversationModifiedDate: Boolean = false,
): Either<CoreFailure, Unit>
): Either<CoreFailure, InsertMessageResult>

suspend fun persistSystemMessageToAllConversations(
message: Message.System
Expand Down Expand Up @@ -118,7 +118,7 @@ internal interface MessageRepository {
conversationIdList: List<ConversationId>
): Either<StorageFailure, Map<ConversationId, Message>>

suspend fun getNotificationMessage(messageSizePerConversation: Int = 10): Either<CoreFailure, Flow<List<LocalNotification>>>
suspend fun getNotificationMessage(messageSizePerConversation: Int = 10): Either<CoreFailure, List<LocalNotification>>

suspend fun getMessagesByConversationIdAndVisibilityAfterDate(
conversationId: ConversationId,
Expand Down Expand Up @@ -301,21 +301,20 @@ internal class MessageDataSource internal constructor(
@OptIn(ExperimentalCoroutinesApi::class)
override suspend fun getNotificationMessage(
messageSizePerConversation: Int
): Either<CoreFailure, Flow<List<LocalNotification>>> = wrapStorageRequest {
messageDAO.getNotificationMessage().mapLatest { notificationEntities ->
notificationEntities.groupBy { it.conversationId }
.map { (conversationId, messages) ->
LocalNotification.Conversation(
// todo: needs some clean up!
id = conversationId.toModel(),
conversationName = messages.first().conversationName,
messages = messages.mapNotNull { message ->
messageMapper.fromMessageToLocalNotificationMessage(message)
},
isOneToOneConversation = messages.first().conversationType == ConversationEntity.Type.ONE_ON_ONE
)
}
}
): Either<CoreFailure, List<LocalNotification>> = wrapStorageRequest {
val notificationEntities = messageDAO.getNotificationMessage()
notificationEntities.groupBy { it.conversationId }
.map { (conversationId, messages) ->
LocalNotification.Conversation(
// todo: needs some clean up!
id = conversationId.toModel(),
conversationName = messages.first().conversationName,
messages = messages.mapNotNull { message ->
messageMapper.fromMessageToLocalNotificationMessage(message)
},
isOneToOneConversation = messages.first().conversationType == ConversationEntity.Type.ONE_ON_ONE
)
}
}

@DelicateKaliumApi(
Expand All @@ -326,7 +325,7 @@ internal class MessageDataSource internal constructor(
message: Message.Standalone,
updateConversationReadDate: Boolean,
updateConversationModifiedDate: Boolean,
): Either<CoreFailure, Unit> = wrapStorageRequest {
): Either<CoreFailure, InsertMessageResult> = wrapStorageRequest {
messageDAO.insertOrIgnoreMessage(
messageMapper.fromMessageToEntity(message),
updateConversationReadDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ package com.wire.kalium.logic.data.message
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.notification.NotificationEventsManager
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.persistence.dao.message.InsertMessageResult

/**
* Internal UseCase that should be used instead of MessageRepository.persistMessage(Message)
Expand All @@ -34,16 +38,24 @@ interface PersistMessageUseCase {

internal class PersistMessageUseCaseImpl(
private val messageRepository: MessageRepository,
private val selfUserId: UserId
private val selfUserId: UserId,
private val notificationEventsManager: NotificationEventsManager
) : PersistMessageUseCase {
override suspend operator fun invoke(message: Message.Standalone): Either<CoreFailure, Unit> {
val modifiedMessage = getExpectsReadConfirmationFromMessage(message)

val isSelfSender = message.isSelfTheSender(selfUserId)
return messageRepository.persistMessage(
message = modifiedMessage,
updateConversationReadDate = message.isSelfTheSender(selfUserId),
updateConversationReadDate = isSelfSender,
updateConversationModifiedDate = message.content.shouldUpdateConversationOrder()
)
).onSuccess {
val isConversationMuted = it == InsertMessageResult.INSERTED_INTO_MUTED_CONVERSATION

if (!isConversationMuted && !isSelfSender && message.content.shouldNotifyUser()) {
notificationEventsManager.scheduleRegularNotificationChecking()
}
Comment on lines +55 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥇

}.map { }
}

private fun Message.isSelfTheSender(selfUserId: UserId) = senderUserId == selfUserId
Expand Down Expand Up @@ -115,4 +127,58 @@ internal class PersistMessageUseCaseImpl(
is MessageContent.MemberChange.RemovedFromTeam -> false
is MessageContent.TeamMemberRemoved -> false
}

@Suppress("ComplexMethod")
private fun MessageContent.shouldNotifyUser(): Boolean =
when (this) {
is MessageContent.Text,
is MessageContent.Asset,
is MessageContent.Knock,
is MessageContent.RestrictedAsset,
is MessageContent.MissedCall,
is MessageContent.Location -> true

is MessageContent.MemberChange.Added,
is MessageContent.MemberChange.Removed,
is MessageContent.Calling,
is MessageContent.DeleteMessage,
is MessageContent.TextEdited,
is MessageContent.DeleteForMe,
is MessageContent.Unknown,
is MessageContent.Availability,
is MessageContent.FailedDecryption,
is MessageContent.Ignored,
is MessageContent.LastRead,
is MessageContent.Reaction,
is MessageContent.Cleared,
is MessageContent.ConversationRenamed,
is MessageContent.Receipt,
is MessageContent.ClientAction,
is MessageContent.CryptoSessionReset,
is MessageContent.NewConversationReceiptMode,
is MessageContent.ConversationReceiptModeChanged,
is MessageContent.HistoryLost,
is MessageContent.HistoryLostProtocolChanged,
is MessageContent.ConversationMessageTimerChanged,
is MessageContent.MemberChange.CreationAdded,
is MessageContent.MemberChange.FailedToAdd,
is MessageContent.ConversationCreated,
is MessageContent.MLSWrongEpochWarning,
MessageContent.ConversationDegradedMLS,
MessageContent.ConversationVerifiedMLS,
MessageContent.ConversationDegradedProteus,
MessageContent.ConversationVerifiedProteus,
is MessageContent.Composite,
is MessageContent.ButtonAction,
is MessageContent.ButtonActionConfirmation,
is MessageContent.MemberChange.FederationRemoved,
is MessageContent.FederationStopped.ConnectionRemoved,
is MessageContent.FederationStopped.Removed,
is MessageContent.ConversationProtocolChanged,
alexandreferris marked this conversation as resolved.
Show resolved Hide resolved
is MessageContent.ConversationProtocolChangedDuringACall,
is MessageContent.ConversationStartedUnverifiedWarning,
is MessageContent.LegalHold,
is MessageContent.MemberChange.RemovedFromTeam,
is MessageContent.TeamMemberRemoved -> false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,103 @@
* along with this program. If not, see http://www.gnu.org/licenses/.
*/

package com.wire.kalium.logic.feature.message
package com.wire.kalium.logic.data.notification

import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.event.Event
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.notification.LocalNotification
import com.wire.kalium.logic.data.user.User
import com.wire.kalium.logic.di.MapperProvider
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow

/**
* This singleton allow us to queue ephemeral notifications from different user flows.
* Ideally we should have logic that allows to mark messages as notified, but this will act for cases when we need to notify the user on
* information we have not persisted or that is not available anymore.
* This singleton allow us to queue checking for new regular notifications AND queue ephemeral notifications from different user flows.
*/
object EphemeralEventsNotificationManagerImpl : EphemeralEventsNotificationManager {
object NotificationEventsManagerImpl : NotificationEventsManager {

private val mapper by lazy { MapperProvider.localNotificationMessageMapper() }

private val notifications = MutableSharedFlow<LocalNotification>()
private val ephemeralNotifications = MutableSharedFlow<LocalNotification>()
private val regularNotificationChecking = MutableSharedFlow<Unit>()

override suspend fun observeEphemeralNotifications(): Flow<LocalNotification> = notifications
override suspend fun observeEphemeralNotifications(): Flow<LocalNotification> = ephemeralNotifications

override suspend fun scheduleDeleteConversationNotification(ephemeralConversationNotification: EphemeralConversationNotification) {
val localNotification = mapper.fromConversationEventToLocalNotification(
ephemeralConversationNotification.conversationEvent,
ephemeralConversationNotification.conversation,
ephemeralConversationNotification.user
)
notifications.emit(localNotification)
ephemeralNotifications.emit(localNotification)
}

override suspend fun scheduleDeleteMessageNotification(message: Message) {
val localNotification = mapper.fromMessageToMessageDeletedLocalNotification(message)
notifications.emit(localNotification)
ephemeralNotifications.emit(localNotification)
}

override suspend fun scheduleEditMessageNotification(message: Message, messageContent: MessageContent.TextEdited) {
val localNotification = mapper.fromMessageToMessageEditedLocalNotification(message, messageContent)
notifications.emit(localNotification)
ephemeralNotifications.emit(localNotification)
}

override suspend fun scheduleConversationSeenNotification(conversationId: ConversationId) {
val localNotification = mapper.toConversationSeen(conversationId)
notifications.emit(localNotification)
ephemeralNotifications.emit(localNotification)
}

override suspend fun scheduleRegularNotificationChecking() {
regularNotificationChecking.emit(Unit)
}

override suspend fun observeRegularNotificationsChecking(): Flow<Unit> = regularNotificationChecking
}

interface EphemeralEventsNotificationManager {
interface NotificationEventsManager {
/**
* Ideally we should have logic that allows to mark messages as notified,
* but this will act for cases when we need to notify the user on
* information we have not persisted or that is not available anymore.
*
* @return [Flow] of [LocalNotification] that is not stored in DB
* and no chance to get in any other way than just emit when it's received
*/
suspend fun observeEphemeralNotifications(): Flow<LocalNotification>

/**
* Schedule the notification that some conversation was deleted
* (if the notification about that conversation is displayed it should be hidden)
*/
suspend fun scheduleDeleteConversationNotification(ephemeralConversationNotification: EphemeralConversationNotification)

/**
* Schedule the notification that some message was deleted (if the notification about that message is displayed it should be hidden)
*/
suspend fun scheduleDeleteMessageNotification(message: Message)

/**
* Schedule the notification that some message was edited (if the notification about that message is displayed it should be edited)
*/
suspend fun scheduleEditMessageNotification(message: Message, messageContent: MessageContent.TextEdited)

/**
* Schedule the notification that informs that some conversation been seen by self-user on another device.
* (means that notifications about that conversation can be hidden)
*/
suspend fun scheduleConversationSeenNotification(conversationId: ConversationId)

/**
* Schedule re-checking of the regular notifications - notifications that are persisted and can be got by the DB-query.
*/
suspend fun scheduleRegularNotificationChecking()

/**
* @return [Flow] that emits every time when new message/event that user should be notified about came and persisted
*/
suspend fun observeRegularNotificationsChecking(): Flow<Unit>
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ import com.wire.kalium.logic.feature.legalhold.UpdateSelfClientCapabilityToLegal
import com.wire.kalium.logic.feature.legalhold.UpdateSelfClientCapabilityToLegalHoldConsentUseCaseImpl
import com.wire.kalium.logic.feature.message.AddSystemMessageToAllConversationsUseCase
import com.wire.kalium.logic.feature.message.AddSystemMessageToAllConversationsUseCaseImpl
import com.wire.kalium.logic.feature.message.EphemeralEventsNotificationManagerImpl
import com.wire.kalium.logic.data.notification.NotificationEventsManagerImpl
import com.wire.kalium.logic.feature.message.MessageScope
import com.wire.kalium.logic.feature.message.MessageSendingScheduler
import com.wire.kalium.logic.feature.message.PendingProposalScheduler
Expand Down Expand Up @@ -847,7 +847,7 @@ class UserSessionScope internal constructor(
)

val persistMessage: PersistMessageUseCase
get() = PersistMessageUseCaseImpl(messageRepository, userId)
get() = PersistMessageUseCaseImpl(messageRepository, userId, NotificationEventsManagerImpl)

private val addSystemMessageToAllConversationsUseCase: AddSystemMessageToAllConversationsUseCase
get() = AddSystemMessageToAllConversationsUseCaseImpl(messageRepository, userId)
Expand Down Expand Up @@ -1299,20 +1299,20 @@ class UserSessionScope internal constructor(
callManager,
persistMessage,
persistReaction,
MessageTextEditHandlerImpl(messageRepository, EphemeralEventsNotificationManagerImpl),
MessageTextEditHandlerImpl(messageRepository, NotificationEventsManagerImpl),
LastReadContentHandlerImpl(
conversationRepository,
userId,
isMessageSentInSelfConversation,
EphemeralEventsNotificationManagerImpl
NotificationEventsManagerImpl
),
ClearConversationContentHandlerImpl(
conversationRepository,
userId,
isMessageSentInSelfConversation,
),
DeleteForMeHandlerImpl(messageRepository, isMessageSentInSelfConversation),
DeleteMessageHandlerImpl(messageRepository, assetRepository, EphemeralEventsNotificationManagerImpl, userId),
DeleteMessageHandlerImpl(messageRepository, assetRepository, NotificationEventsManagerImpl, userId),
messageEncoder,
receiptMessageHandler,
buttonActionConfirmationHandler,
Expand Down Expand Up @@ -1350,7 +1350,9 @@ class UserSessionScope internal constructor(
)
private val deletedConversationHandler: DeletedConversationEventHandler
get() = DeletedConversationEventHandlerImpl(
userRepository, conversationRepository, EphemeralEventsNotificationManagerImpl
userRepository,
conversationRepository,
NotificationEventsManagerImpl
)
private val memberJoinHandler: MemberJoinEventHandler
get() = MemberJoinEventHandlerImpl(
Expand Down
Loading
Loading