diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 012b930218..fa3416a74c 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -30,6 +30,7 @@ * [LIFO](guide/jobs/lifo.md) * [Job Ids](guide/jobs/job-ids.md) * [Job Data](guide/jobs/job-data.md) + * [Debouncing](guide/jobs/debouncing.md) * [Delayed](guide/jobs/delayed.md) * [Repeatable](guide/jobs/repeatable.md) * [Prioritized](guide/jobs/prioritized.md) diff --git a/docs/gitbook/guide/jobs/debouncing.md b/docs/gitbook/guide/jobs/debouncing.md new file mode 100644 index 0000000000..276bf3a503 --- /dev/null +++ b/docs/gitbook/guide/jobs/debouncing.md @@ -0,0 +1,49 @@ +# Debouncing + +Debouncing in BullMQ is a process where job execution is delayed and deduplicated based on specific identifiers. It ensures that within a specified period, or until a specific job is completed or failed, no new jobs with the same identifier will be added to the queue. Instead, these attempts will trigger a debounced event. + +## Fixed Mode + +In the Fixed Mode, debouncing works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique debouncer ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization. + +```typescript +import { Queue } from 'bullmq'; + +const myQueue = new Queue('Paint'); + +// Add a job that will be debounced for 5 seconds. +await myQueue.add( + 'house', + { color: 'white' }, + { debounce: { id: 'customValue', ttl: 5000 } }, +); +``` + +In this example, after adding the house painting job with the debouncing parameters (id and ttl), any subsequent job with the same debouncing ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job. + +Note that you must provide a debounce id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier. + +## Extended Mode + +The Extended Mode takes a different approach by extending the debouncing duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same debouncer ID will be ignored. + +```typescript +// Add a job that will be debounced as this record is not finished (completed or failed). +await myQueue.add( + 'house', + { color: 'white' }, + { debounce: { id: 'customValue' } }, +); +``` + +While this job is not moved to completed or failed state, next jobs added with same **debounce id** will be ignored and a _debounced_ event will be triggered by our QueueEvent class. + +This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress. + +{% hint style="warning" %} +Any manual deletion will disable the debouncing. For example, when calling _job.remove_ method. +{% endhint %} + +## Read more: + +- 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add) diff --git a/src/classes/job.ts b/src/classes/job.ts index d13b895059..06035eefd9 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -38,6 +38,7 @@ import type { QueueEvents } from './queue-events'; const logger = debuglog('bull'); const optsDecodeMap = { + de: 'debounce', fpof: 'failParentOnFailure', idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', @@ -136,6 +137,11 @@ export class Job< */ parent?: ParentKeys; + /** + * Debounce identifier. + */ + debounceId?: string; + /** * Base repeat job key. */ @@ -199,6 +205,8 @@ export class Job< ? { id: opts.parent.id, queueKey: opts.parent.queue } : undefined; + this.debounceId = opts.debounce ? opts.debounce.id : undefined; + this.toKey = queue.toKey.bind(queue); this.setScripts(); @@ -322,6 +330,10 @@ export class Job< job.repeatJobKey = json.rjk; } + if (json.deid) { + job.debounceId = json.deid; + } + job.failedReason = json.failedReason; job.attemptsStarted = parseInt(json.ats || '0'); @@ -445,6 +457,7 @@ export class Job< timestamp: this.timestamp, failedReason: JSON.stringify(this.failedReason), stacktrace: JSON.stringify(this.stacktrace), + debounceId: this.debounceId, repeatJobKey: this.repeatJobKey, returnvalue: JSON.stringify(this.returnvalue), }; diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index 44dae24740..27201c26ca 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -45,6 +45,13 @@ export interface QueueEventsListener extends IoredisListener { id: string, ) => void; + /** + * Listen to 'debounced' event. + * + * This event is triggered when a job is debounced because debouncedId still existed. + */ + debounced: (args: { jobId: string }, id: string) => void; + /** * Listen to 'delayed' event. * diff --git a/src/classes/queue-keys.ts b/src/classes/queue-keys.ts index 29224c2bbd..9109ae8dba 100644 --- a/src/classes/queue-keys.ts +++ b/src/classes/queue-keys.ts @@ -24,6 +24,7 @@ export class QueueKeys { 'events', 'pc', // priority counter key 'marker', // marker key + 'de', // debounce key ].forEach(key => { keys[key] = this.toKey(name, key); }); diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 834aa53eab..0e6b6cded4 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -187,10 +187,10 @@ export class Queue< * Get global concurrency value. * Returns null in case no value is set. */ - async getGlobalConcurrency():Promise { + async getGlobalConcurrency(): Promise { const client = await this.client; const concurrency = await client.hget(this.keys.meta, 'concurrency'); - if(concurrency){ + if (concurrency) { return Number(concurrency); } return null; @@ -203,12 +203,11 @@ export class Queue< * is processed at any given time. If this limit is not defined, there will be no * restriction on the number of concurrent jobs. */ - async setGlobalConcurrency(concurrency: number) { - const client = await this.client; - return client.hset(this.keys.meta, 'concurrency', concurrency); - } - - + async setGlobalConcurrency(concurrency: number) { + const client = await this.client; + return client.hset(this.keys.meta, 'concurrency', concurrency); + } + /** * Adds a new job to the queue. * @@ -374,6 +373,17 @@ export class Queue< return !removed; } + /** + * Removes a debounce key. + * + * @param id - identifier + */ + async removeDebounceKey(id: string): Promise { + const client = await this.client; + + return client.del(`${this.keys.de}:${id}`); + } + /** * Removes a repeatable job by its key. Note that the key is the one used * to store the repeatable job metadata and not one of the job iterations diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index ec66c5a366..df3f722c28 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -183,6 +183,7 @@ export class Scripts { parentOpts.parentDependenciesKey || null, parent, job.repeatJobKey, + job.debounceId ? `${queueKeys.de}:${job.debounceId}` : null, ]; let encodedOpts; diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index 4a42362534..cbec0dcf63 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -25,6 +25,7 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key + [10] debounce key ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -49,12 +50,14 @@ local args = cmsgpack.unpack(ARGV[1]) local data = ARGV[2] local parentKey = args[5] -local repeatJobKey = args[9] local parent = args[8] +local repeatJobKey = args[9] +local debounceKey = args[10] local parentData -- Includes --- @include "includes/addDelayMarkerIfNeeded" +--- @include "includes/debounceJob" --- @include "includes/getDelayedScore" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" @@ -73,6 +76,7 @@ local opts = cmsgpack.unpack(ARGV[3]) local parentDependenciesKey = args[7] local timestamp = args[4] + if args[2] == "" then jobId = jobCounter jobIdKey = args[1] .. jobId @@ -86,6 +90,12 @@ else end end +local debouncedJobId = debounceJob(args[1], opts['de'], + jobId, debounceKey, eventsKey, maxEvents) +if debouncedJobId then + return debouncedJobId +end + -- Store the job. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, diff --git a/src/commands/addParentJob-4.lua b/src/commands/addParentJob-4.lua index 4dc0c87ae9..0cb89372dc 100644 --- a/src/commands/addParentJob-4.lua +++ b/src/commands/addParentJob-4.lua @@ -20,6 +20,7 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key + [10] debounce key ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -44,11 +45,13 @@ local data = ARGV[2] local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] -local repeatJobKey = args[9] local parent = args[8] +local repeatJobKey = args[9] +local debounceKey = args[10] local parentData -- Includes +--- @include "includes/debounceJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" --- @include "includes/storeJob" @@ -78,6 +81,12 @@ else end end +local debouncedJobId = debounceJob(args[1], opts['de'], + jobId, debounceKey, eventsKey, maxEvents) +if debouncedJobId then + return debouncedJobId +end + -- Store the job. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, repeatJobKey) diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index 53b6410c97..288e779154 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -24,7 +24,8 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key - + [10] debounce key + ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -51,12 +52,14 @@ local data = ARGV[2] local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] -local repeatJobKey = args[9] local parent = args[8] +local repeatJobKey = args[9] +local debounceKey = args[10] local parentData -- Includes --- @include "includes/addJobWithPriority" +--- @include "includes/debounceJob" --- @include "includes/storeJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" @@ -87,6 +90,12 @@ else end end +local debouncedJobId = debounceJob(args[1], opts['de'], + jobId, debounceKey, eventsKey, maxEvents) +if debouncedJobId then + return debouncedJobId +end + -- Store the job. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index ed07744abe..54fb363d73 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -34,7 +34,8 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key - + [10] debounce key + ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -54,12 +55,14 @@ local data = ARGV[2] local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] -local repeatJobKey = args[9] local parent = args[8] +local repeatJobKey = args[9] +local debounceKey = args[10] local parentData -- Includes --- @include "includes/addJobInTargetList" +--- @include "includes/debounceJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/getTargetQueueList" --- @include "includes/handleDuplicatedJob" @@ -91,6 +94,12 @@ else end end +local debouncedJobId = debounceJob(args[1], opts['de'], + jobId, debounceKey, eventsKey, maxEvents) +if debouncedJobId then + return debouncedJobId +end + -- Store the job. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, repeatJobKey) diff --git a/src/commands/includes/cleanList.lua b/src/commands/includes/cleanList.lua index 99b397ef53..5dbae4abc3 100644 --- a/src/commands/includes/cleanList.lua +++ b/src/commands/includes/cleanList.lua @@ -34,7 +34,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd, -- replace the entry with a deletion marker; the actual deletion will -- occur at the end of the script rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker) - removeJob(job, true, jobKeyPrefix) + removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) deletedCount = deletedCount + 1 table.insert(deleted, job) end diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index f83393d50a..c48b098693 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -21,14 +21,14 @@ local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attrib local jobKey = jobKeyPrefix .. job if isFinished then - removeJob(job, true, jobKeyPrefix) + removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) deletedCount = deletedCount + 1 table.insert(deleted, job) else -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed jobTS = getTimestamp(jobKey, attributes) if (not jobTS or jobTS <= timestamp) then - removeJob(job, true, jobKeyPrefix) + removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) deletedCount = deletedCount + 1 table.insert(deleted, job) end diff --git a/src/commands/includes/debounceJob.lua b/src/commands/includes/debounceJob.lua new file mode 100644 index 0000000000..b40f4cb8db --- /dev/null +++ b/src/commands/includes/debounceJob.lua @@ -0,0 +1,23 @@ +--[[ + Function to debounce a job. +]] + +local function debounceJob(prefixKey, debounceOpts, jobId, debounceKey, eventsKey, maxEvents) + local debounceId = debounceOpts and debounceOpts['id'] + if debounceId then + local ttl = debounceOpts['ttl'] + local debounceKeyExists + if ttl then + debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX') + else + debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX') + end + if debounceKeyExists then + local currentDebounceJobId = rcall('GET', debounceKey) + rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", + "debounced", "jobId", currentDebounceJobId) + return currentDebounceJobId + end + end +end + \ No newline at end of file diff --git a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua index bd6a148a6a..da10722d7c 100644 --- a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua +++ b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua @@ -4,6 +4,7 @@ -- Includes --- @include "moveParentToWaitIfNeeded" +--- @include "removeDebounceKeyIfNeeded" local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentId, jobIdKey, timestamp) if rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) == 1 then @@ -13,10 +14,12 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, rcall("XADD", parentQueueKey .. ":events", "*", "event", "failed", "jobId", parentId, "failedReason", failedReason, "prev", "waiting-children") - local rawParentData = rcall("HGET", parentKey, "parent") + local jobAttributes = rcall("HMGET", parentKey, "parent", "deid") - if rawParentData ~= false then - local parentData = cjson.decode(rawParentData) + removeDebounceKeyIfNeeded(parentQueueKey, jobAttributes[2]) + + if jobAttributes[1] then + local parentData = cjson.decode(jobAttributes[1]) if parentData['fpof'] then moveParentFromWaitingChildrenToFailed( parentData['queueKey'], diff --git a/src/commands/includes/removeDebounceKey.lua b/src/commands/includes/removeDebounceKey.lua new file mode 100644 index 0000000000..5c87385824 --- /dev/null +++ b/src/commands/includes/removeDebounceKey.lua @@ -0,0 +1,12 @@ +--[[ + Function to remove debounce key. +]] + +local function removeDebounceKey(prefixKey, jobKey) + local debounceId = rcall("HGET", jobKey, "deid") + if debounceId then + local debounceKey = prefixKey .. "de:" .. debounceId + rcall("DEL", debounceKey) + end +end + \ No newline at end of file diff --git a/src/commands/includes/removeDebounceKeyIfNeeded.lua b/src/commands/includes/removeDebounceKeyIfNeeded.lua new file mode 100644 index 0000000000..fbc058e20f --- /dev/null +++ b/src/commands/includes/removeDebounceKeyIfNeeded.lua @@ -0,0 +1,14 @@ +--[[ + Function to remove debounce key if needed. +]] + +local function removeDebounceKeyIfNeeded(prefixKey, debounceId) + if debounceId then + local debounceKey = prefixKey .. "de:" .. debounceId + local pttl = rcall("PTTL", debounceKey) + + if pttl == 0 or pttl == -1 then + rcall("DEL", debounceKey) + end + end +end diff --git a/src/commands/includes/removeJob.lua b/src/commands/includes/removeJob.lua index c9c02a182c..8eb245feac 100644 --- a/src/commands/includes/removeJob.lua +++ b/src/commands/includes/removeJob.lua @@ -3,11 +3,15 @@ ]] -- Includes +--- @include "removeDebounceKey" --- @include "removeJobKeys" --- @include "removeParentDependencyKey" -local function removeJob(jobId, hard, baseKey) +local function removeJob(jobId, hard, baseKey, shouldRemoveDebounceKey) local jobKey = baseKey .. jobId removeParentDependencyKey(jobKey, hard, nil, baseKey) + if shouldRemoveDebounceKey then + removeDebounceKey(baseKey, jobKey) + end removeJobKeys(jobKey) end diff --git a/src/commands/includes/removeJobs.lua b/src/commands/includes/removeJobs.lua index 120bc7e00c..58b67abd97 100644 --- a/src/commands/includes/removeJobs.lua +++ b/src/commands/includes/removeJobs.lua @@ -7,7 +7,7 @@ local function removeJobs(keys, hard, baseKey, max) for i, key in ipairs(keys) do - removeJob(key, hard, baseKey) + removeJob(key, hard, baseKey, true --[[remove debounce key]]) end return max - #keys end diff --git a/src/commands/includes/removeJobsByMaxAge.lua b/src/commands/includes/removeJobsByMaxAge.lua index 2acd93b11c..ca24fad3a7 100644 --- a/src/commands/includes/removeJobsByMaxAge.lua +++ b/src/commands/includes/removeJobsByMaxAge.lua @@ -5,11 +5,12 @@ -- Includes --- @include "removeJob" -local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix) +local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix, + shouldRemoveDebounceKey) local start = timestamp - maxAge * 1000 local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") for i, jobId in ipairs(jobIds) do - removeJob(jobId, false, prefix) + removeJob(jobId, false, prefix, false --[[remove debounce key]]) end rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) end diff --git a/src/commands/includes/removeJobsByMaxCount.lua b/src/commands/includes/removeJobsByMaxCount.lua index 9fd31d3025..af52c612c4 100644 --- a/src/commands/includes/removeJobsByMaxCount.lua +++ b/src/commands/includes/removeJobsByMaxCount.lua @@ -9,7 +9,7 @@ local function removeJobsByMaxCount(maxCount, targetSet, prefix) local start = maxCount local jobIds = rcall("ZREVRANGE", targetSet, start, -1) for i, jobId in ipairs(jobIds) do - removeJob(jobId, false, prefix) + removeJob(jobId, false, prefix, false --[[remove debounce key]]) end rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)) end diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index dee16ca994..87262850c9 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -21,7 +21,7 @@ local function moveParentToWait(parentPrefix, parentId, emitEvent) end end -local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) +local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId) if parentKey then local parentDependenciesKey = parentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) @@ -36,8 +36,11 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) if numRemovedElements == 1 then if hard then -- remove parent in same queue if parentPrefix == baseKey then - removeParentDependencyKey(parentKey, hard, nil, baseKey) + removeParentDependencyKey(parentKey, hard, nil, baseKey, nil) removeJobKeys(parentKey) + if debounceId then + rcall("DEL", parentPrefix .. "de:" .. debounceId) + end else moveParentToWait(parentPrefix, parentId) end @@ -49,7 +52,8 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) return true end else - local missedParentKey = rcall("HGET", jobKey, "parentKey") + local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid") + local missedParentKey = parentAttributes[1] if( (type(missedParentKey) == "string") and missedParentKey ~= "" and (rcall("EXISTS", missedParentKey) == 1)) then local parentDependenciesKey = missedParentKey .. ":dependencies" @@ -65,8 +69,11 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) if numRemovedElements == 1 then if hard then if parentPrefix == baseKey then - removeParentDependencyKey(missedParentKey, hard, nil, baseKey) + removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil) removeJobKeys(missedParentKey) + if parentAttributes[2] then + rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2]) + end else moveParentToWait(parentPrefix, parentId) end diff --git a/src/commands/includes/storeJob.lua b/src/commands/includes/storeJob.lua index f9c4728052..ef53c9cba1 100644 --- a/src/commands/includes/storeJob.lua +++ b/src/commands/includes/storeJob.lua @@ -6,7 +6,8 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, local jsonOpts = cjson.encode(opts) local delay = opts['delay'] or 0 local priority = opts['priority'] or 0 - + local debounceId = opts['de'] and opts['de']['id'] + local optionalValues = {} if parentKey ~= nil then table.insert(optionalValues, "parentKey") @@ -20,6 +21,11 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, table.insert(optionalValues, repeatJobKey) end + if debounceId then + table.insert(optionalValues, "deid") + table.insert(optionalValues, debounceId) + end + rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts, "timestamp", timestamp, "delay", delay, "priority", priority, unpack(optionalValues)) diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 9660e77fcf..fe39587ebc 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -29,6 +29,7 @@ local rcall = redis.call --- @include "includes/getTargetQueueList" --- @include "includes/moveParentFromWaitingChildrenToFailed" --- @include "includes/moveParentToWaitIfNeeded" +--- @include "includes/removeDebounceKeyIfNeeded" --- @include "includes/removeJob" --- @include "includes/removeJobsByMaxAge" --- @include "includes/removeJobsByMaxCount" @@ -83,12 +84,14 @@ if (#stalling > 0) then local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1) if (stalledCount > MAX_STALLED_JOB_COUNT) then - local jobAttributes = rcall("HMGET", jobKey, "opts", "parent") + local jobAttributes = rcall("HMGET", jobKey, "opts", "parent", "deid") local rawOpts = jobAttributes[1] local rawParentData = jobAttributes[2] local opts = cjson.decode(rawOpts) local removeOnFailType = type(opts["removeOnFail"]) rcall("ZADD", failedKey, timestamp, jobId) + removeDebounceKeyIfNeeded(queueKeyPrefix, jobAttributes[3]) + local failedReason = "job stalled more than allowable limit" rcall("HMSET", jobKey, "failedReason", failedReason, @@ -122,7 +125,8 @@ if (#stalling > 0) then failedKey, queueKeyPrefix) elseif removeOnFailType == "boolean" then if opts["removeOnFail"] then - removeJob(jobId, false, queueKeyPrefix) + removeJob(jobId, false, queueKeyPrefix, + false --[[remove debounce key]]) rcall("ZREM", failedKey, jobId) end elseif removeOnFailType ~= "nil" then diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-14.lua index 2edbe03835..dc396da706 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-14.lua @@ -65,6 +65,7 @@ local rcall = redis.call --- @include "includes/moveParentToWaitIfNeeded" --- @include "includes/prepareJobForProcessing" --- @include "includes/promoteDelayedJobs" +--- @include "includes/removeDebounceKeyIfNeeded" --- @include "includes/removeJobKeys" --- @include "includes/removeJobsByMaxAge" --- @include "includes/removeJobsByMaxCount" @@ -93,12 +94,12 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists return -4 end - local parentReferences = rcall("HMGET", jobIdKey, "parentKey", "parent") - local parentKey = parentReferences[1] or "" + local jobAttributes = rcall("HMGET", jobIdKey, "parentKey", "parent", "deid") + local parentKey = jobAttributes[1] or "" local parentId = "" local parentQueueKey = "" - if parentReferences[2] ~= false then - local jsonDecodedParent = cjson.decode(parentReferences[2]) + if jobAttributes[2] ~= false then + local jsonDecodedParent = cjson.decode(jobAttributes[2]) parentId = jsonDecodedParent['id'] parentQueueKey = jsonDecodedParent['queueKey'] end @@ -116,6 +117,10 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- Trim events before emiting them to avoid trimming events emitted in this script trimEvents(metaKey, eventStreamKey) + local prefix = ARGV[7] + + removeDebounceKeyIfNeeded(prefix, jobAttributes[3]) + -- If job has a parent we need to -- 1) remove this job id from parents dependencies -- 2) move the job Id to parent "processed" set @@ -164,8 +169,6 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- "returnvalue" / "failedReason" and "finishedOn" -- Remove old jobs? - local prefix = ARGV[7] - if maxAge ~= nil then removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix) end @@ -179,7 +182,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- TODO: when a child is removed when finished, result or failure in parent -- must not be deleted, those value references should be deleted when the parent -- is deleted - removeParentDependencyKey(jobIdKey, false, parentKey) + removeParentDependencyKey(jobIdKey, false, parentKey, jobAttributes[3]) end end @@ -205,7 +208,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8]) -- Check if there are delayed jobs that can be promoted - promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, ARGV[7], + promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, prefix, timestamp, KEYS[10], isPausedOrMaxed) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) @@ -229,19 +232,19 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists if jobId == "0:0" then jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) - return prepareJobForProcessing(ARGV[7], KEYS[6], eventStreamKey, jobId, + return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, timestamp, maxJobs, opts) end else - return prepareJobForProcessing(ARGV[7], KEYS[6], eventStreamKey, jobId, + return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, timestamp, maxJobs, opts) end else jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) if jobId then - return prepareJobForProcessing(ARGV[7], KEYS[6], eventStreamKey, jobId, + return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, timestamp, maxJobs, opts) end diff --git a/src/commands/removeChildDependency-1.lua b/src/commands/removeChildDependency-1.lua index 6eddcb2e60..5caa9aad43 100644 --- a/src/commands/removeChildDependency-1.lua +++ b/src/commands/removeChildDependency-1.lua @@ -25,7 +25,7 @@ if rcall("EXISTS", jobKey) ~= 1 then return -1 end if rcall("EXISTS", parentKey) ~= 1 then return -5 end -if removeParentDependencyKey(jobKey, false, parentKey, KEYS[1]) then +if removeParentDependencyKey(jobKey, false, parentKey, KEYS[1], nil) then rcall("HDEL", jobKey, "parentKey", "parent") return 0 diff --git a/src/commands/removeJob-2.lua b/src/commands/removeJob-2.lua index aaee237727..19c62de1d3 100644 --- a/src/commands/removeJob-2.lua +++ b/src/commands/removeJob-2.lua @@ -19,6 +19,7 @@ local rcall = redis.call --- @include "includes/destructureJobKey" --- @include "includes/getOrSetMaxEvents" --- @include "includes/isLocked" +--- @include "includes/removeDebounceKey" --- @include "includes/removeJobFromAnyState" --- @include "includes/removeJobKeys" --- @include "includes/removeParentDependencyKey" @@ -26,7 +27,7 @@ local rcall = redis.call local function removeJob( prefix, jobId, parentKey, removeChildren) local jobKey = prefix .. jobId; - removeParentDependencyKey(jobKey, false, parentKey) + removeParentDependencyKey(jobKey, false, parentKey, nil) if removeChildren == "1" then -- Check if this job has children @@ -66,6 +67,7 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) local prev = removeJobFromAnyState(prefix, jobId) + removeDebounceKey(prefix, jobKey) if removeJobKeys(jobKey) > 0 then local maxEvents = getOrSetMaxEvents(KEYS[2]) rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed", diff --git a/src/interfaces/debounce-options.ts b/src/interfaces/debounce-options.ts new file mode 100644 index 0000000000..02a34fa2fa --- /dev/null +++ b/src/interfaces/debounce-options.ts @@ -0,0 +1,14 @@ +/** + * Debounce options + */ +export interface DebounceOptions { + /** + * ttl in milliseconds + */ + ttl?: number; + + /** + * Identifier + */ + id: string; +} diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index 533fa81b31..65dfed0396 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -3,6 +3,7 @@ export * from './backoff-options'; export * from './base-job-options'; export * from './child-message'; export * from './connection'; +export * from './debounce-options'; export * from './flow-job'; export * from './ioredis-events'; export * from './job-json'; diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index 38d27e6d8a..79f104d934 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -18,6 +18,7 @@ export interface JobJson { parent?: ParentKeys; parentKey?: string; repeatJobKey?: string; + debounceId?: string; processedBy?: string; } @@ -37,6 +38,7 @@ export interface JobJsonRaw { returnvalue: string; parentKey?: string; parent?: string; + deid?: string; rjk?: string; atm?: string; ats?: string; diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 51d5416312..4b0eea7b78 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -1,6 +1,11 @@ -import { BaseJobOptions } from '../interfaces'; +import { BaseJobOptions, DebounceOptions } from '../interfaces'; export type JobsOptions = BaseJobOptions & { + /** + * Debounce options. + */ + debounce?: DebounceOptions; + /** * If true, moves parent to failed. */ @@ -21,6 +26,11 @@ export type JobsOptions = BaseJobOptions & { * These fields are the ones stored in Redis with smaller keys for compactness. */ export type RedisJobOptions = BaseJobOptions & { + /** + * Debounce identifier. + */ + deid?: string; + /** * If true, moves parent to failed. */ diff --git a/tests/test_events.ts b/tests/test_events.ts index 69477a7a07..41d99c2839 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -398,6 +398,177 @@ describe('events', function () { }); }); + describe('when job is debounced when added again with same debounce id', function () { + describe('when ttl is provided', function () { + it('used a fixed time period and emits debounced event', async function () { + const testName = 'test'; + + const job = await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + + let debouncedCounter = 0; + let secondJob; + queueEvents.on('debounced', ({ jobId }) => { + if (debouncedCounter > 1) { + expect(jobId).to.be.equal(secondJob.id); + } else { + expect(jobId).to.be.equal(job.id); + } + debouncedCounter++; + }); + + await delay(1000); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(1100); + secondJob = await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(100); + + expect(debouncedCounter).to.be.equal(4); + }); + + describe('when removing debounced job', function () { + it('removes debounce key', async function () { + const testName = 'test'; + + const job = await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + + let debouncedCounter = 0; + queueEvents.on('debounced', ({ jobId }) => { + debouncedCounter++; + }); + await job.remove(); + + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(1000); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(1100); + const secondJob = await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await secondJob.remove(); + + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(100); + + expect(debouncedCounter).to.be.equal(2); + }); + }); + }); + + describe('when ttl is not provided', function () { + it('waits until job is finished before removing debounce key', async function () { + const testName = 'test'; + + const worker = new Worker( + queueName, + async () => { + await delay(100); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1' } }, + ); + await delay(100); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1' } }, + ); + await delay(100); + }, + { + autorun: false, + connection, + prefix, + }, + ); + await worker.waitUntilReady(); + + let debouncedCounter = 0; + + const completing = new Promise(resolve => { + queueEvents.once('completed', ({ jobId }) => { + expect(jobId).to.be.equal('1'); + resolve(); + }); + + queueEvents.on('debounced', ({ jobId }) => { + debouncedCounter++; + }); + }); + + worker.run(); + + await queue.add(testName, { foo: 'bar' }, { debounce: { id: 'a1' } }); + + await completing; + + const secondJob = await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1' } }, + ); + + const count = await queue.getJobCountByTypes(); + + expect(count).to.be.eql(2); + + expect(debouncedCounter).to.be.equal(2); + expect(secondJob.id).to.be.equal('4'); + await worker.close(); + }); + }); + }); + it('should emit an event when a job becomes active', async () => { const worker = new Worker(queueName, async job => {}, { connection, diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 066252c5f6..34cd191905 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1948,7 +1948,7 @@ describe('workers', function () { await worker.waitUntilReady(); - const jobs = await Promise.all( + await Promise.all( Array.from({ length: concurrency }).map(() => queue.add('test', { bar: 'baz' }), ),