Skip to content

Commit

Permalink
Subscribe to group config messages (#969)
Browse files Browse the repository at this point in the history
  • Loading branch information
SessionHero01 authored Feb 24, 2025
1 parent ad7792f commit 01b1d26
Show file tree
Hide file tree
Showing 15 changed files with 189 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,14 @@ class ConfigUploader @Inject constructor(
auth,
snode,
push,
Namespace.CLOSED_GROUP_MEMBERS()
Namespace.GROUP_MEMBERS()
)
}
}

val infoConfigHashTask = infoPush?.let { push ->
async {
push to pushConfig(auth, snode, push, Namespace.CLOSED_GROUP_INFO())
push to pushConfig(auth, snode, push, Namespace.GROUP_INFO())
}
}

Expand All @@ -252,7 +252,7 @@ class ConfigUploader @Inject constructor(
snode = snode,
publicKey = auth.accountId.hexString,
request = SnodeAPI.buildAuthenticatedStoreBatchInfo(
Namespace.ENCRYPTION_KEYS(),
Namespace.GROUP_KEYS(),
SnodeMessage(
auth.accountId.hexString,
Base64.encodeBytes(push),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class GroupManagerV2Impl @Inject constructor(
val memberKey = configs.groupKeys.supplementFor(newMembers.map { it.hexString })
batchRequests.add(
SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace = Namespace.ENCRYPTION_KEYS(),
namespace = Namespace.GROUP_KEYS(),
message = SnodeMessage(
recipient = group.hexString,
data = Base64.encodeBytes(memberKey),
Expand Down
56 changes: 11 additions & 45 deletions app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
Expand All @@ -30,7 +29,6 @@ import org.session.libsession.utilities.getGroup
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.exceptions.NonRetryableException
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.IdPrefix
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
Expand All @@ -48,6 +46,7 @@ class GroupPoller(
private val lokiApiDatabase: LokiAPIDatabaseProtocol,
private val clock: SnodeClock,
private val appVisibilityManager: AppVisibilityManager,
private val groupRevokedMessageHandler: GroupRevokedMessageHandler,
) {
companion object {
private const val POLL_INTERVAL = 3_000L
Expand Down Expand Up @@ -239,7 +238,7 @@ class GroupPoller(
val lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
groupId.hexString,
Namespace.CLOSED_GROUP_MESSAGES()
Namespace.GROUP_MESSAGES()
).orEmpty()

Log.d(TAG, "Retrieving group message since lastHash = $lastHash")
Expand All @@ -250,17 +249,17 @@ class GroupPoller(
request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lastHash,
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
namespace = Namespace.GROUP_MESSAGES(),
maxSize = null,
),
responseType = Map::class.java
)
}

val groupConfigRetrieval = listOf(
Namespace.ENCRYPTION_KEYS(),
Namespace.CLOSED_GROUP_INFO(),
Namespace.CLOSED_GROUP_MEMBERS()
Namespace.GROUP_KEYS(),
Namespace.GROUP_INFO(),
Namespace.GROUP_MEMBERS()
).map { ns ->
async {
SnodeAPI.sendBatchRequest(
Expand Down Expand Up @@ -288,9 +287,9 @@ class GroupPoller(
val result = runCatching {
val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() }
handleGroupConfigMessages(keysMessage, infoMessage, membersMessage)
saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS())
saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO())
saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS())
saveLastMessageHash(snode, keysMessage, Namespace.GROUP_KEYS())
saveLastMessageHash(snode, infoMessage, Namespace.GROUP_INFO())
saveLastMessageHash(snode, membersMessage, Namespace.GROUP_MEMBERS())

groupExpired = configFactoryProtocol.withGroupConfigs(groupId) {
it.groupKeys.size() == 0
Expand Down Expand Up @@ -370,40 +369,7 @@ class GroupPoller(
}

private suspend fun handleRevoked(messages: List<RetrieveMessageResponse.Message>) {
messages.forEach { msg ->
val decoded = configFactoryProtocol.decryptForUser(
msg.data,
Sodium.KICKED_DOMAIN,
groupId,
)

if (decoded != null) {
// The message should be in the format of "<sessionIdPubKeyBinary><messageGenerationASCII>",
// where the pub key is 32 bytes, so we need to have at least 33 bytes of data
if (decoded.size < 33) {
Log.w(TAG, "Received an invalid kicked message, expecting at least 33 bytes, got ${decoded.size}")
return@forEach
}

val sessionId = AccountId(IdPrefix.STANDARD, decoded.copyOfRange(0, 32))
val messageGeneration = decoded.copyOfRange(32, decoded.size).decodeToString().toIntOrNull()
if (messageGeneration == null) {
Log.w(TAG, "Received an invalid kicked message: missing message generation")
return@forEach
}

val currentKeysGeneration = configFactoryProtocol.withGroupConfigs(groupId) {
it.groupKeys.currentGeneration()
}

val isForMe = sessionId.hexString == storage.getUserPublicKey()
Log.d(TAG, "Received kicked message, for us? ${isForMe}, message key generation = $messageGeneration, our key generation = $currentKeysGeneration")

if (isForMe && messageGeneration >= currentKeysGeneration) {
groupManagerV2.handleKicked(groupId)
}
}
}
groupRevokedMessageHandler.handleRevokeMessage(groupId, messages.map { it.data })
}

private fun handleGroupConfigMessages(
Expand Down Expand Up @@ -437,7 +403,7 @@ class GroupPoller(
snode = snode,
publicKey = groupId.hexString,
decrypt = it.groupKeys::decrypt,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
namespace = Namespace.GROUP_MESSAGES(),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class GroupPollerManager @Inject constructor(
preferences: TextSecurePreferences,
appVisibilityManager: AppVisibilityManager,
connectivity: InternetConnectivity,
groupRevokedMessageHandler: GroupRevokedMessageHandler,
) {
@Suppress("OPT_IN_USAGE")
private val groupPollers: StateFlow<Map<AccountId, GroupPollerHandle>> =
Expand Down Expand Up @@ -117,6 +118,7 @@ class GroupPollerManager @Inject constructor(
lokiApiDatabase = lokiApiDatabase,
clock = clock,
appVisibilityManager = appVisibilityManager,
groupRevokedMessageHandler = groupRevokedMessageHandler,
),
scope = scope
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.thoughtcrime.securesms.groups

import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.IdPrefix
import org.session.libsignal.utilities.Log
import javax.inject.Inject
import javax.inject.Provider

class GroupRevokedMessageHandler @Inject constructor(
private val configFactoryProtocol: ConfigFactoryProtocol,
private val storage: StorageProtocol,
private val groupManagerV2: Provider<GroupManagerV2>,
) {
suspend fun handleRevokeMessage(
groupId: AccountId,
rawMessages: List<ByteArray>,
) {
rawMessages.forEach { data ->
val decoded = configFactoryProtocol.decryptForUser(
data,
Sodium.KICKED_DOMAIN,
groupId,
)

if (decoded != null) {
// The message should be in the format of "<sessionIdPubKeyBinary><messageGenerationASCII>",
// where the pub key is 32 bytes, so we need to have at least 33 bytes of data
if (decoded.size < 33) {
Log.w(TAG, "Received an invalid kicked message, expecting at least 33 bytes, got ${decoded.size}")
return@forEach
}

val sessionId = AccountId(IdPrefix.STANDARD, decoded.copyOfRange(0, 32)) // copyOfRange: [start,end)
val messageGeneration = decoded.copyOfRange(32, decoded.size).decodeToString().toIntOrNull()
if (messageGeneration == null) {
Log.w(TAG, "Received an invalid kicked message: missing message generation")
return@forEach
}

val currentKeysGeneration = configFactoryProtocol.withGroupConfigs(groupId) {
it.groupKeys.currentGeneration()
}

val isForMe = sessionId.hexString == storage.getUserPublicKey()
Log.d(TAG, "Received kicked message, for us? ${isForMe}, message key generation = $messageGeneration, our key generation = $currentKeysGeneration")

if (isForMe && messageGeneration >= currentKeysGeneration) {
groupManagerV2.get().handleKicked(groupId)
}
}
}
}

companion object {
private const val TAG = "GroupRevokedMessageHandler"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class RemoveGroupMemberHandler @Inject constructor(
// Call No 3. Conditionally send the `GroupUpdateDeleteMemberContent`
if (pendingRemovals.any { (member, status) -> member.shouldRemoveMessages(status) }) {
calls += SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
namespace = Namespace.GROUP_MESSAGES(),
message = buildDeleteGroupMemberContentMessage(
adminKey = adminKey,
groupAccountId = groupAccountId.hexString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import androidx.core.content.ContextCompat.getString
import com.goterl.lazysodium.interfaces.AEAD
import com.goterl.lazysodium.utils.Key
import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import network.loki.messenger.R
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
Expand All @@ -20,6 +22,7 @@ import org.session.libsession.messaging.sending_receiving.notifications.PushNoti
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.messaging.utilities.SodiumUtilities
import org.session.libsession.messaging.utilities.SodiumUtilities.sodium
import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.bencode.Bencode
import org.session.libsession.utilities.bencode.BencodeList
import org.session.libsession.utilities.bencode.BencodeString
Expand All @@ -30,13 +33,15 @@ import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.thoughtcrime.securesms.crypto.IdentityKeyUtil
import org.thoughtcrime.securesms.dependencies.ConfigFactory
import org.thoughtcrime.securesms.groups.GroupRevokedMessageHandler
import javax.inject.Inject

private const val TAG = "PushHandler"

class PushReceiver @Inject constructor(
@ApplicationContext private val context: Context,
private val configFactory: ConfigFactory
private val configFactory: ConfigFactory,
private val groupRevokedMessageHandler: GroupRevokedMessageHandler,
) {
private val json = Json { ignoreUnknownKeys = true }

Expand All @@ -56,32 +61,71 @@ class PushReceiver @Inject constructor(
addMessageReceiveJob(PushData(data = data, metadata = null))
}

private fun addMessageReceiveJob(pushData: PushData?){
private fun addMessageReceiveJob(pushData: PushData?) {
// send a generic notification if we have no data
if (pushData?.data == null) {
sendGenericNotification()
return
}

try {
val namespace = pushData.metadata?.namespace
val params = when {
pushData.metadata?.namespace == Namespace.CLOSED_GROUP_MESSAGES() -> {
namespace == Namespace.GROUP_MESSAGES() ||
namespace == Namespace.REVOKED_GROUP_MESSAGES() ||
namespace == Namespace.GROUP_INFO() ||
namespace == Namespace.GROUP_MEMBERS() ||
namespace == Namespace.GROUP_KEYS() -> {
val groupId = AccountId(requireNotNull(pushData.metadata.account) {
"Received a closed group message push notification without an account ID"
})

val envelop = checkNotNull(tryDecryptGroupMessage(groupId, pushData.data)) {
"Unable to decrypt closed group message"
if (namespace == Namespace.GROUP_MESSAGES()) {
val envelope = checkNotNull(tryDecryptGroupEnvelope(groupId, pushData.data)) {
"Unable to decrypt closed group message"
}

MessageReceiveParameters(
data = envelope.toByteArray(),
serverHash = pushData.metadata.msg_hash,
closedGroup = Destination.ClosedGroup(groupId.hexString)
)
} else if (namespace == Namespace.REVOKED_GROUP_MESSAGES()) {
GlobalScope.launch {
groupRevokedMessageHandler.handleRevokeMessage(groupId, listOf(pushData.data))
}

null
} else {
val hash = requireNotNull(pushData.metadata.msg_hash) {
"Received a closed group config push notification without a message hash"
}

// If we receive group config messages from notification, try to merge
// them directly
val configMessage = listOf(
ConfigMessage(
hash = hash,
data = pushData.data,
timestamp = pushData.metadata.timestampSeconds
)
)

configFactory.mergeGroupConfigMessages(
groupId = groupId,
keys = configMessage.takeIf { namespace == Namespace.GROUP_KEYS() }
.orEmpty(),
members = configMessage.takeIf { namespace == Namespace.GROUP_MEMBERS() }
.orEmpty(),
info = configMessage.takeIf { namespace == Namespace.GROUP_INFO() }
.orEmpty(),
)

null
}

MessageReceiveParameters(
data = envelop.toByteArray(),
serverHash = pushData.metadata.msg_hash,
closedGroup = Destination.ClosedGroup(groupId.hexString)
)
}

pushData.metadata?.namespace == 0 || pushData.metadata == null -> {
namespace == Namespace.DEFAULT() || pushData.metadata == null -> {
val envelopeAsData = MessageWrapper.unwrap(pushData.data).toByteArray()
MessageReceiveParameters(
data = envelopeAsData,
Expand All @@ -90,25 +134,30 @@ class PushReceiver @Inject constructor(
}

else -> {
Log.w(TAG, "Received a push notification with an unknown namespace: ${pushData.metadata.namespace}")
Log.w(TAG, "Received a push notification with an unknown namespace: $namespace")
return
}
}

JobQueue.shared.add(BatchMessageReceiveJob(listOf(params), null))
if (params != null) {
JobQueue.shared.add(BatchMessageReceiveJob(listOf(params), null))
}
} catch (e: Exception) {
Log.d(TAG, "Failed to unwrap data for message due to error.", e)
}

}


private fun tryDecryptGroupMessage(groupId: AccountId, data: ByteArray): Envelope? {
val (envelopBytes, sender) = checkNotNull(configFactory.withGroupConfigs(groupId) { it.groupKeys.decrypt(data) }) {
private fun tryDecryptGroupEnvelope(groupId: AccountId, data: ByteArray): Envelope? {
val (envelopBytes, sender) = checkNotNull(configFactory.withGroupConfigs(groupId) {
it.groupKeys.decrypt(
data
)
}) {
"Failed to decrypt group message"
}

Log.d(TAG, "Successfully decrypted group message from ${sender.hexString}")
Log.d(TAG, "Successfully decrypted group message from $sender")
return Envelope.parseFrom(envelopBytes)
.toBuilder()
.setSource(sender.hexString)
Expand Down
Loading

0 comments on commit 01b1d26

Please sign in to comment.