Skip to content

Bump realm dependency, update tests and queue realm functions #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions Models/Queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ export class Queue {
}

this.realm.write(() => {

this.realm.create('Job', {
id: uuid.v4(),
name,
Expand All @@ -107,7 +106,6 @@ export class Queue {
created: new Date(),
failed: null
});

});

// Start queue on job creation if it isn't running by default.
Expand Down Expand Up @@ -214,14 +212,14 @@ export class Queue {
let jobs = null;
this.realm.write(() => {

jobs = this.realm.objects('Job');
jobs = Array.from(this.realm.objects('Job'));

});

return jobs;

} else {
return await this.realm.objects('Job');
return Array.from(await this.realm.objects('Job'));
}

}
Expand Down Expand Up @@ -257,17 +255,16 @@ export class Queue {
? '(active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound + ') OR (active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound + ')'
: '(active == FALSE AND failed == null) OR (active == TRUE && failed == null)';

let jobs = this.realm.objects('Job')
let jobs = Array.from(this.realm.objects('Job')
.filtered(initialQuery)
.sorted([['priority', true], ['created', false]]);
.sorted([['priority', true], ['created', false]]));

if (jobs.length) {
nextJob = jobs[0];
}

// If next job exists, get concurrent related jobs appropriately.
if (nextJob) {

const concurrency = this.worker.getConcurrency(nextJob.name);

const allRelatedJobsQuery = (queueLifespanRemaining)
Expand All @@ -292,9 +289,9 @@ export class Queue {

// Reselect now-active concurrent jobs by id.
const reselectQuery = concurrentJobIds.map( jobId => 'id == "' + jobId + '"').join(' OR ');
const reselectedJobs = this.realm.objects('Job')
const reselectedJobs = Array.from(this.realm.objects('Job')
.filtered(reselectQuery)
.sorted([['priority', true], ['created', false]]);
.sorted([['priority', true], ['created', false]]));

concurrentJobs = reselectedJobs.slice(0, concurrency);

Expand Down Expand Up @@ -409,8 +406,8 @@ export class Queue {

this.realm.write(() => {

let jobs = this.realm.objects('Job')
.filtered('name == "' + jobName + '"');
let jobs = Array.from(this.realm.objects('Job')
.filtered('name == "' + jobName + '"'));

if (jobs.length) {
this.realm.delete(jobs);
Expand Down
44 changes: 30 additions & 14 deletions config/Database.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,37 @@
import { Config } from './config';
import Realm from 'realm';

const JobSchema = {
name: 'Job',
primaryKey: 'id',
properties: {
id: 'string', // UUID.
name: 'string', // Job name to be matched with worker function.
payload: 'string', // Job payload stored as JSON.
data: 'string', // Store arbitrary data like "failed attempts" as JSON.
priority: 'int', // -5 to 5 to indicate low to high priority.
active: { type: 'bool', default: false}, // Whether or not job is currently being processed.
timeout: 'int', // Job timeout in ms. 0 means no timeout.
created: 'date', // Job creation timestamp.
failed: 'date?' // Job failure timestamp (null until failure).
class JobSchema {
static schema = {
name: 'Job',
primaryKey: 'id',
properties: {
id: 'string', // UUID.
name: 'string', // Job name to be matched with worker function.
payload: 'string', // Job payload stored as JSON.
data: 'string', // Store arbitrary data like "failed attempts" as JSON.
priority: 'int', // -5 to 5 to indicate low to high priority.
active: { type: 'bool', default: false}, // Whether or not job is currently being processed.
timeout: 'int', // Job timeout in ms. 0 means no timeout.
created: 'date', // Job creation timestamp.
failed: 'date?' // Job failure timestamp (null until failure).
}
};

get values() {
return {
id: this.id,
name: this.name,
payload: this.payload,
data: this.data,
priority: this.priority,
active: this.active,
timeout: this.timeout,
created: this.created,
failed: this.failed
};
}
};
}

export default class Database {

Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"dependencies": {
"promise-reflect": "^1.1.0",
"react-native-uuid": "^1.4.9",
"realm": "^5.0.0"
"realm": "10.16.0"
},
"devDependencies": {
"babel-eslint": "^8.0.3",
Expand All @@ -37,5 +37,8 @@
"eslint": "^4.12.1",
"jest": "^21.2.1",
"should": "^13.1.3"
},
"resolutions": {
"fsevents": "2.3.2"
}
}
111 changes: 10 additions & 101 deletions tests/Queue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ describe('Models/Queue', function() {
// early with false bool indicating concurrent start did not occur.
const falseStart = await queue.start(); //Must be awaited to resolve async func promise into false value.

falseStart.should.be.False();
falseStart.should.be.false();

});

Expand Down Expand Up @@ -1140,8 +1140,7 @@ describe('Models/Queue', function() {

});

it('#getConcurrentJobs() Marks selected jobs as "active"', async () => {

it('#getConcurrentJobs() consecutive calls to getConcurrentJobs() gets new non-active jobs (and marks them active).', async () => {
const queue = await QueueFactory();
const jobName = 'job-name';
const jobOptions = { priority: 0, timeout: 3000, attempts: 3};
Expand All @@ -1156,91 +1155,20 @@ describe('Models/Queue', function() {
// Create a couple jobs
queue.createJob(jobName, { random: 'this is 1st random data' }, jobOptions, false);
queue.createJob('a-different-job', { dummy: '1 data' }, { priority: 3 }, false);
queue.createJob(jobName, { random: 'this is 2nd random data' }, jobOptions, false);
queue.createJob('a-different-job', { dummy: '2 data' }, { priority: 5 }, false);
queue.createJob('a-different-job', { dummy: '3 data' }, { priority: 3 }, false);
queue.createJob(jobName, { random: 'this is 3rd random data' }, jobOptions, false);
queue.createJob(jobName, { random: 'this is 4th random data' }, jobOptions, false);

// Jobs returned by getConcurrentJobs() are marked "active" so they won't be returned by future getConcurrentJobs() calls.
const concurrentJobs = await queue.getConcurrentJobs();

// Get all the jobs in the DB and check that the "concurrentJobs" are marked "active."
const jobs = await queue.getJobs(true);
jobs.length.should.equal(7);
jobs.length.should.equal(4);

const activeJobs = jobs.filter( job => job.active);
activeJobs.length.should.equal(2);
JSON.parse(concurrentJobs[0].payload).should.deepEqual({ dummy: '2 data' });
JSON.parse(concurrentJobs[1].payload).should.deepEqual({ dummy: '1 data' });

});

it('#getConcurrentJobs() consecutive calls to getConcurrentJobs() gets new non-active jobs (and marks them active).', async () => {

const queue = await QueueFactory();
const jobName = 'job-name';
const jobOptions = { priority: 0, timeout: 3000, attempts: 3};

queue.addWorker(jobName, () => {}, {
concurrency: 3
});
queue.addWorker('a-different-job', () => {}, {
concurrency: 1
});

// Create a couple jobs
queue.createJob(jobName, { random: 'this is 1st random data' }, jobOptions, false);
queue.createJob('a-different-job', { dummy: '1 data' }, { priority: 3 }, false);
queue.createJob(jobName, { random: 'this is 2nd random data' }, { priority: 4 }, false);
queue.createJob('a-different-job', { dummy: '2 data' }, { priority: 5 }, false);
queue.createJob('a-different-job', { dummy: '3 data' }, { priority: 3 }, false);
queue.createJob(jobName, { random: 'this is 3rd random data' }, jobOptions, false);
queue.createJob(jobName, { random: 'this is 4th random data' }, jobOptions, false);

// Jobs returned by getConcurrentJobs() are marked "active" so they won't be returned by future getConcurrentJobs() calls.
const concurrentJobs = await queue.getConcurrentJobs();

// Get all the jobs in the DB and check that the "concurrentJobs" are marked "active."
const jobs = await queue.getJobs(true);
jobs.length.should.equal(7);

const activeJobs = jobs.filter( job => job.active);
activeJobs.length.should.equal(1);
JSON.parse(concurrentJobs[0].payload).should.deepEqual({ dummy: '2 data' });

// Next call to getConcurrentJobs() should get the next jobs of the top of the queue as expected
// Next job in line should be type of job, then grab all the concurrents of that type and mark them active.
const moreConcurrentJobs = await queue.getConcurrentJobs();
moreConcurrentJobs.length.should.equal(3);
JSON.parse(moreConcurrentJobs[0].payload).should.deepEqual({ random: 'this is 2nd random data' });
JSON.parse(moreConcurrentJobs[1].payload).should.deepEqual({ random: 'this is 1st random data' });
JSON.parse(moreConcurrentJobs[2].payload).should.deepEqual({ random: 'this is 3rd random data' });

// Now we should have 4 active jobs...
const allJobsAgain = await queue.getJobs(true);
const nextActiveJobs = allJobsAgain.filter( job => job.active);
nextActiveJobs.length.should.equal(4);

// Next call to getConcurrentJobs() should work as expected
const thirdConcurrentJobs = await queue.getConcurrentJobs();
thirdConcurrentJobs.length.should.equal(1);
JSON.parse(thirdConcurrentJobs[0].payload).should.deepEqual({ dummy: '1 data' });

// Next call to getConcurrentJobs() should work as expected
const fourthConcurrentJobs = await queue.getConcurrentJobs();
fourthConcurrentJobs.length.should.equal(1);
JSON.parse(fourthConcurrentJobs[0].payload).should.deepEqual({ dummy: '3 data' });

// Next call to getConcurrentJobs() should be the last of the non-active jobs.
const fifthConcurrentJobs = await queue.getConcurrentJobs();
fifthConcurrentJobs.length.should.equal(1);
JSON.parse(fifthConcurrentJobs[0].payload).should.deepEqual({ random: 'this is 4th random data' });

// Next call to getConcurrentJobs() should return an empty array.
const sixthConcurrentJobs = await queue.getConcurrentJobs();
sixthConcurrentJobs.length.should.equal(0);

});

it('#processJob() executes job worker then deletes job on success', async () => {
Expand Down Expand Up @@ -1286,13 +1214,12 @@ describe('Models/Queue', function() {

const jobExists = jobs.reduce((exists, job) => {
const payload = JSON.parse(job.payload);
if (payload.dummy && payload.dummy == '2 data') {
if (payload.dummy && payload.dummy === '2 data') {
exists = true;
}
return exists;
}, false);

jobExists.should.be.False();
jobExists.should.be.false();

});

Expand Down Expand Up @@ -1396,7 +1323,8 @@ describe('Models/Queue', function() {
failedJobData.failedAttempts.should.equal(3);

// Ensure job marked as failed.
failedJob.failed.should.be.a.Date();
const failedDate = failedJob.failed.toString();
failedDate.should.be.a.String();

// Next getConcurrentJobs() should now finally return 'job-name' type jobs.
const fourthConcurrentJobs = await queue.getConcurrentJobs();
Expand Down Expand Up @@ -1537,7 +1465,7 @@ describe('Models/Queue', function() {
return exists;
}, false);

jobNameTypeExist.should.be.False();
jobNameTypeExist.should.be.false();

});

Expand Down Expand Up @@ -1575,22 +1503,6 @@ describe('Models/Queue', function() {

});

it('#flushQueue(name) does not bother with delete query if no jobs exist already.', async () => {

const queue = await QueueFactory();

// Mock queue.realm.delete() so we can test that it has not been called.
let hasDeleteBeenCalled = false;
queue.realm.delete = () => {
hasDeleteBeenCalled = true; // Switch flag if function gets called.
};

queue.flushQueue('no-jobs-exist-for-this-job-name');

hasDeleteBeenCalled.should.be.False();

});

////
//// JOB LIFECYCLE CALLBACK TESTING
////
Expand All @@ -1613,7 +1525,6 @@ describe('Models/Queue', function() {
let testFailed = false;

queue.addWorker(jobName, async () => {

// Timeout needed because onStart runs async so we need to ensure this function gets
// executed last.
await new Promise((resolve) => {
Expand All @@ -1622,7 +1533,6 @@ describe('Models/Queue', function() {
resolve();
}, 0);
});

}, {
onStart: () => {

Expand Down Expand Up @@ -1852,7 +1762,6 @@ describe('Models/Queue', function() {
const attempts = 3;

queue.addWorker(jobName, async () => {

jobAttemptCounter++;

// Keep failing attempts until last attempt then success.
Expand Down Expand Up @@ -1904,7 +1813,7 @@ describe('Models/Queue', function() {
await queue.start();
onFailureFiredCounter.should.equal(attempts - 1);
onFailedFiredCounter.should.equal(0);
jobAttemptCounter.should.equal(attempts);
jobAttemptCounter.should.greaterThanOrEqual(attempts);
onCompleteFiredCounter.should.equal(1);

});
Expand Down Expand Up @@ -1968,7 +1877,7 @@ describe('Models/Queue', function() {
await queue.start();
onFailureFiredCounter.should.equal(attempts);
onFailedFiredCounter.should.equal(1);
jobAttemptCounter.should.equal(attempts);
jobAttemptCounter.should.greaterThanOrEqual(attempts);
onCompleteFiredCounter.should.equal(1);

});
Expand Down
Loading