From 32d5bfb132af08b9458e70ef1093b9eb7636034f Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Thu, 27 Feb 2025 16:05:24 +1100 Subject: [PATCH] Fix job queue issue resuming message send job (#990) --- .../libsession/messaging/jobs/JobQueue.kt | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 33a7f8b0f69..a1f57124118 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -1,8 +1,6 @@ package org.session.libsession.messaging.jobs -import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel @@ -11,6 +9,7 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsignal.utilities.Log +import java.lang.RuntimeException import java.util.Timer import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger @@ -19,14 +18,16 @@ import kotlin.math.min import kotlin.math.pow import kotlin.math.roundToLong -@OptIn(ExperimentalCoroutinesApi::class) class JobQueue : JobDelegate { private var hasResumedPendingJobs = false // Just for debugging private val jobTimestampMap = ConcurrentHashMap() private val scope: CoroutineScope = GlobalScope private val queue = Channel(UNLIMITED) - private val pendingJobIds = mutableSetOf() + + // Track the send message jobs that are pending or in progress. This doesn't take the + // first launch of the send message job into account + private val pendingSendMessageJobIDs = hashSetOf() private val openGroupChannels = mutableMapOf>() @@ -92,12 +93,19 @@ class JobQueue : JobDelegate { Log.d(dispatcherName,"processJob: ${javaClass.simpleName} (id: $id)") delegate = this@JobQueue - try { + val runResult = runCatching { execute(dispatcherName) } - catch (e: Exception) { + + // Remove the job from the pending "send message job" list, regardless of whether + // we are a send message job, as IDs are unique across all job types + synchronized(pendingSendMessageJobIDs) { + pendingSendMessageJobIDs.remove(id) + } + + runResult.onFailure { e -> Log.d(dispatcherName, "unhandledJobException: ${javaClass.simpleName} (id: $id)", e) - this@JobQueue.handleJobFailed(this, dispatcherName, e) + this@JobQueue.handleJobFailed(this, dispatcherName, e as? Exception ?: RuntimeException(e)) } } @@ -181,7 +189,13 @@ class JobQueue : JobDelegate { Log.e("Loki", "tried to resume pending send job with no ID") return } - if (!pendingJobIds.add(id)) { + + // Check if the job is already in progress and mark it as in progress if it is not + val jobIsInProgress = synchronized(pendingSendMessageJobIDs) { + !pendingSendMessageJobIDs.add(id) + } + + if (jobIsInProgress) { Log.e("Loki","tried to re-queue pending/in-progress job (id: $id)") return } @@ -234,7 +248,6 @@ class JobQueue : JobDelegate { override fun handleJobSucceeded(job: Job, dispatcherName: String) { val jobId = job.id ?: return MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId) - pendingJobIds.remove(jobId) } override fun handleJobFailed(job: Job, dispatcherName: String, error: Exception) {