11package org.session.libsession.messaging.jobs
22
3- import kotlinx.coroutines.CoroutineDispatcher
43import kotlinx.coroutines.CoroutineScope
5- import kotlinx.coroutines.Dispatchers
64import kotlinx.coroutines.ExperimentalCoroutinesApi
75import kotlinx.coroutines.GlobalScope
86import kotlinx.coroutines.channels.Channel
@@ -11,6 +9,7 @@ import kotlinx.coroutines.isActive
119import kotlinx.coroutines.launch
1210import org.session.libsession.messaging.MessagingModuleConfiguration
1311import org.session.libsignal.utilities.Log
12+ import java.lang.RuntimeException
1413import java.util.Timer
1514import java.util.concurrent.ConcurrentHashMap
1615import java.util.concurrent.atomic.AtomicInteger
@@ -19,14 +18,16 @@ import kotlin.math.min
1918import kotlin.math.pow
2019import kotlin.math.roundToLong
2120
22- @OptIn(ExperimentalCoroutinesApi ::class )
2321class JobQueue : JobDelegate {
2422 private var hasResumedPendingJobs = false // Just for debugging
2523 private val jobTimestampMap = ConcurrentHashMap <Long , AtomicInteger >()
2624
2725 private val scope: CoroutineScope = GlobalScope
2826 private val queue = Channel <Job >(UNLIMITED )
29- private val pendingJobIds = mutableSetOf<String >()
27+
28+ // Track the send message jobs that are pending or in progress. This doesn't take the
29+ // first launch of the send message job into account
30+ private val pendingSendMessageJobIDs = hashSetOf<String >()
3031
3132 private val openGroupChannels = mutableMapOf<String , Channel <Job >>()
3233
@@ -92,12 +93,19 @@ class JobQueue : JobDelegate {
9293 Log .d(dispatcherName," processJob: ${javaClass.simpleName} (id: $id )" )
9394 delegate = this @JobQueue
9495
95- try {
96+ val runResult = runCatching {
9697 execute(dispatcherName)
9798 }
98- catch (e: Exception ) {
99+
100+ // Remove the job from the pending "send message job" list, regardless of whether
101+ // we are a send message job, as IDs are unique across all job types
102+ synchronized(pendingSendMessageJobIDs) {
103+ pendingSendMessageJobIDs.remove(id)
104+ }
105+
106+ runResult.onFailure { e ->
99107 Log .d(dispatcherName, " unhandledJobException: ${javaClass.simpleName} (id: $id )" , e)
100- this @JobQueue.handleJobFailed(this , dispatcherName, e)
108+ this @JobQueue.handleJobFailed(this , dispatcherName, e as ? Exception ? : RuntimeException (e) )
101109 }
102110 }
103111
@@ -181,7 +189,13 @@ class JobQueue : JobDelegate {
181189 Log .e(" Loki" , " tried to resume pending send job with no ID" )
182190 return
183191 }
184- if (! pendingJobIds.add(id)) {
192+
193+ // Check if the job is already in progress and mark it as in progress if it is not
194+ val jobIsInProgress = synchronized(pendingSendMessageJobIDs) {
195+ ! pendingSendMessageJobIDs.add(id)
196+ }
197+
198+ if (jobIsInProgress) {
185199 Log .e(" Loki" ," tried to re-queue pending/in-progress job (id: $id )" )
186200 return
187201 }
@@ -234,7 +248,6 @@ class JobQueue : JobDelegate {
234248 override fun handleJobSucceeded (job : Job , dispatcherName : String ) {
235249 val jobId = job.id ? : return
236250 MessagingModuleConfiguration .shared.storage.markJobAsSucceeded(jobId)
237- pendingJobIds.remove(jobId)
238251 }
239252
240253 override fun handleJobFailed (job : Job , dispatcherName : String , error : Exception ) {
0 commit comments