Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.session.libsession.messaging.jobs

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
Expand All @@ -11,6 +9,7 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsignal.utilities.Log
import java.lang.RuntimeException
import java.util.Timer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -19,14 +18,16 @@ import kotlin.math.min
import kotlin.math.pow
import kotlin.math.roundToLong

@OptIn(ExperimentalCoroutinesApi::class)
class JobQueue : JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()

private val scope: CoroutineScope = GlobalScope
private val queue = Channel<Job>(UNLIMITED)
private val pendingJobIds = mutableSetOf<String>()

// Track the send message jobs that are pending or in progress. This doesn't take the
// first launch of the send message job into account
private val pendingSendMessageJobIDs = hashSetOf<String>()

private val openGroupChannels = mutableMapOf<String, Channel<Job>>()

Expand Down Expand Up @@ -92,12 +93,19 @@ class JobQueue : JobDelegate {
Log.d(dispatcherName,"processJob: ${javaClass.simpleName} (id: $id)")
delegate = this@JobQueue

try {
val runResult = runCatching {
execute(dispatcherName)
}
catch (e: Exception) {

// Remove the job from the pending "send message job" list, regardless of whether
// we are a send message job, as IDs are unique across all job types
synchronized(pendingSendMessageJobIDs) {
pendingSendMessageJobIDs.remove(id)
}

runResult.onFailure { e ->
Log.d(dispatcherName, "unhandledJobException: ${javaClass.simpleName} (id: $id)", e)
this@JobQueue.handleJobFailed(this, dispatcherName, e)
this@JobQueue.handleJobFailed(this, dispatcherName, e as? Exception ?: RuntimeException(e))
}
}

Expand Down Expand Up @@ -181,7 +189,13 @@ class JobQueue : JobDelegate {
Log.e("Loki", "tried to resume pending send job with no ID")
return
}
if (!pendingJobIds.add(id)) {

// Check if the job is already in progress and mark it as in progress if it is not
val jobIsInProgress = synchronized(pendingSendMessageJobIDs) {
!pendingSendMessageJobIDs.add(id)
}

if (jobIsInProgress) {
Log.e("Loki","tried to re-queue pending/in-progress job (id: $id)")
return
}
Expand Down Expand Up @@ -234,7 +248,6 @@ class JobQueue : JobDelegate {
override fun handleJobSucceeded(job: Job, dispatcherName: String) {
val jobId = job.id ?: return
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId)
pendingJobIds.remove(jobId)
}

override fun handleJobFailed(job: Job, dispatcherName: String, error: Exception) {
Expand Down