Skip to content

Commit 38f4661

Browse files
Gennaro Del SorboGennaro Del Sorbo
Gennaro Del Sorbo
authored and
Gennaro Del Sorbo
committed
refactor
1 parent 1acf234 commit 38f4661

File tree

6 files changed

+315
-91
lines changed

6 files changed

+315
-91
lines changed

Application/Logger.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const cluster = require("cluster")
77

88
let logger
99

10-
function newLogger(enableConsoleLogging, loggerLevel, enableFileLogger, fileLoggerPath) {
10+
let newLogger = (enableConsoleLogging, loggerLevel, enableFileLogger, fileLoggerPath) => {
1111
return new Promise(resolve => {
1212

1313
logger = new (winston.Logger)({

Application/Utils.js

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
function Utils(){}
1+
"use strict"
22

3-
Utils.prototype.coalesce = function(obj, other){
4-
if(!(obj === null) && !(obj === undefined))
5-
return obj;
6-
return other;
7-
};
3+
class Utils {
4+
constructor() {}
5+
6+
coalesce(obj, other) {
7+
if(!(obj === null) && !(obj === undefined))
8+
return obj;
9+
return other;
10+
}
11+
}
812

913
module.exports = new Utils();

Application/Worker.js

+51-67
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,62 @@
1-
var jobService = require("../Services/JobService");
2-
var cluster = require("cluster");
3-
var jobs = require("./Jobs");
4-
var ReadWriteLock = require("rwlock");
5-
var queueLock = new ReadWriteLock();
6-
var progressLock = new ReadWriteLock();
7-
var Logger = require("./Logger");
8-
var variables = require("./CommonVariables");
1+
"use strict"
92

10-
function Worker(worker){
11-
this.worker = worker;
12-
13-
worker.on('message', function(message) {
14-
if(message.err != undefined){
15-
jobService.error(message.id, message.err);
16-
return;
17-
}
18-
19-
Logger.instance().debug("master received completed notify for jobId %d", message.id);
20-
jobService.done(message.id);
21-
});
3+
const jobService = require("../Services/JobService")
4+
const cluster = require("cluster")
5+
const jobs = require("./Jobs")
6+
const Logger = require("./Logger")
7+
const variables = require("./CommonVariables")
8+
9+
class Worker {
10+
constructor(worker) {
11+
this.worker = worker
12+
13+
worker.on('message', message => {
14+
if (message.err) {
15+
jobService.error(message.id, message.err)
16+
return
17+
}
18+
19+
Logger.instance().debug("master received completed notify for jobId %d", message.id)
20+
jobService.done(message.id)
21+
})
22+
}
2223
}
2324

24-
var queue = [];
25-
var inProgress = false;
25+
let queue = []
2626

27-
if(cluster.isWorker) {
28-
Logger.new(process.env[variables.consoleLogger] === "true", process.env[variables.loggerLevel],
29-
process.env[variables.fileLogger] === "true", process.env[variables.fileLoggerPath]).then(function(log){
30-
log.info("worker %d registered", process.pid);
31-
});
32-
33-
process.on('message', function(job) {
34-
Logger.instance().debug("job %d received", job.id);
35-
queueLock.writeLock(function(release){
36-
queue.push(function() { return jobs.executeJob(job) });
37-
release();
38-
});
39-
tryProcessQueue(inProgress);
40-
});
27+
function *getFromQueue() {
28+
while(queue.length > 0)
29+
yield queue.shift()
4130
}
4231

43-
function tryProcessQueue(isInProgress){
44-
var mustStop = false;
45-
progressLock.writeLock(function(release){
46-
if(isInProgress)
47-
mustStop = true;
48-
inProgress = true;
49-
release();
50-
});
51-
52-
if(mustStop)
53-
return;
32+
let q = getFromQueue();
33+
34+
let processQueue = () => {
35+
let job = q.next().value
36+
if(!job) {
37+
q = getFromQueue();
38+
return processQueue();
39+
}
5440

55-
var job;
56-
queueLock.writeLock(function(release){
57-
job = queue.shift();
58-
release();
59-
});
60-
job().then(function(){
61-
tryContinueProcess();
62-
}).catch(function(err){
63-
tryContinueProcess();
64-
});
41+
job().then(() => {
42+
processQueue()
43+
}).catch(() => {
44+
processQueue()
45+
})
6546
}
6647

67-
function tryContinueProcess(){
68-
if(queue.length > 0)
69-
tryProcessQueue(false);
70-
else {
71-
progressLock.writeLock(function(release){
72-
inProgress = false;
73-
release();
74-
});
75-
}
48+
if (cluster.isWorker) {
49+
Logger.new(process.env[variables.consoleLogger] === "true", process.env[variables.loggerLevel],
50+
process.env[variables.fileLogger] === "true", process.env[variables.fileLoggerPath]).then(log => {
51+
log.info("worker %d registered", process.pid)
52+
})
53+
54+
process.on('message', job => {
55+
Logger.instance().debug("job %d received", job.id)
56+
queue.push(() => { return jobs.executeJob(job) })
57+
})
58+
59+
processQueue()
7660
}
7761

78-
module.exports = Worker;
62+
module.exports = Worker

Test/Services/PartitionServiceTest.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ describe("PartitionService tests", function(){
4747
exceptionThrown.should.be.exactly(true);
4848
});
4949

50-
it("PartitionService parameter mist be greater than 0", function(){
50+
it("PartitionService parameter must be greater than 0", function(){
5151
var exceptionThrown = false;
5252

5353
try {

Test/partitionerTest.js

+16-16
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
var mkdirp = function(path, callback){
1+
var mkdirp = function (path, callback) {
22
callback();
33
};
44
var proxyquire = require("proxyquire");
5-
proxyquire("../Application/Logger", {'mkdirp': mkdirp});
5+
proxyquire("../Application/Logger", { 'mkdirp': mkdirp });
66

77
var sinon = require("sinon");
88
var should = require("should");
@@ -12,22 +12,22 @@ var util = require("util");
1212

1313
process.setMaxListeners(0);
1414

15-
describe("Partitioner", function() {
16-
15+
describe("Partitioner", function () {
16+
1717
var forkStub;
18-
19-
beforeEach(function() {
18+
19+
beforeEach(function () {
2020
var workerObj = {
21-
on: function(){}
21+
on: function () { }
2222
};
23-
forkStub = sinon.stub(cluster, "fork").returns(workerObj);
23+
forkStub = sinon.stub(cluster, "fork").returns(workerObj);
2424
});
25-
26-
afterEach(function() {
27-
forkStub.restore();
25+
26+
afterEach(function () {
27+
forkStub.restore();
2828
});
29-
30-
describe("Configuration", function(){
29+
30+
describe("Configuration", function () {
3131

3232
it("if configuration is undefined, then instantiate 1 worker", function(done){
3333

@@ -183,13 +183,13 @@ describe("Partitioner", function() {
183183
}, 200);
184184
});
185185

186-
it("if consoleLogger is false then console logger is disabled", function(done){
186+
it("if consoleLogger is false then console logger is disabled", function (done) {
187187
cluster.isWorker = false;
188188
var partitioner = new Partitioner({
189189
consoleLogger: false
190190
});
191191

192-
setTimeout(function() {
192+
setTimeout(function () {
193193
var logger = require("../Application/Logger").instance();
194194
should.not.exists(logger.transports.console);
195195
done();
@@ -215,7 +215,7 @@ describe("Partitioner", function() {
215215
cluster.isWorker = false;
216216
var partitioner = new Partitioner();
217217

218-
setTimeout(function() {
218+
setTimeout(function() {
219219
var logger = require("../Application/Logger").instance();
220220
should.exists(logger.transports.file);
221221
done();

0 commit comments

Comments
 (0)