Skip to content

Commit

Permalink
perf(flow): add marker when moving parent to wait (python) (#2408)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Feb 3, 2024
1 parent 1eedf29 commit 6fb6896
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 32 deletions.
6 changes: 4 additions & 2 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
which requires code from "moveToFinished"
]]

-- Includes
--- @include "addJobInTargetList"
--- @include "destructureJobKey"
--- @include "getTargetQueueList"

local function moveParentToWait(parentPrefix, parentId, emitEvent)
local parentTarget = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait", parentPrefix .. "paused")
rcall("RPUSH", parentTarget, parentId)
local parentTarget, isPaused = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait", parentPrefix .. "paused")
addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPaused, parentId)

if emitEvent then
local parentEventStream = parentPrefix .. "events"
Expand Down
70 changes: 40 additions & 30 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1486,41 +1486,51 @@ describe('workers', function () {
});
});

it('should not close the connection', async () => {
const connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
const queueName2 = `test-shared-${v4()}`;

const queue2 = new Queue(queueName2, {
defaultJobOptions: { removeOnComplete: true },
connection,
prefix,
});

await new Promise<void>((resolve, reject) => {
connection.on('ready', async () => {
const worker1 = new Worker(queueName2, null, { connection, prefix });
const worker2 = new Worker(queueName2, null, { connection, prefix });
describe('when connection is passed into a queue', function () {
it('should not close the connection', async () => {
const connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
});
const queueName2 = `test-shared-${v4()}`;

try {
// There is no point into checking the ready status after closing
// since ioredis will not update it anyway:
// https://github.com/luin/ioredis/issues/614
expect(connection.status).to.be.equal('ready');
await worker1.close();
await worker2.close();
await connection.quit();
const queue2 = new Queue(queueName2, {
defaultJobOptions: { removeOnComplete: true },
connection,
prefix,
});

connection.on('end', () => {
resolve();
await new Promise<void>((resolve, reject) => {
connection.on('ready', async () => {
const worker1 = new Worker(queueName2, null, {
connection,
prefix,
});
} catch (err) {
reject(err);
}
const worker2 = new Worker(queueName2, null, {
connection,
prefix,
});

try {
// There is no point into checking the ready status after closing
// since ioredis will not update it anyway:
// https://github.com/luin/ioredis/issues/614
expect(connection.status).to.be.equal('ready');
await worker1.close();
await worker2.close();
await connection.quit();

connection.on('end', () => {
resolve();
});
} catch (err) {
reject(err);
}
});
});
});

await queue2.close();
await removeAllQueueData(new IORedis(redisHost), queueName2);
await queue2.close();
await removeAllQueueData(new IORedis(redisHost), queueName2);
});
});
});

Expand Down

0 comments on commit 6fb6896

Please sign in to comment.