Skip to content

Commit 250bf54

Browse files
committed
Logger improved
1 parent 1b1a444 commit 250bf54

File tree

9 files changed

+94
-72
lines changed

9 files changed

+94
-72
lines changed

Application/ExecuteLocked.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
var ReadWriteLock = require("rwlock");
22
var q = require("q");
3-
var logger = require("./Logger");
3+
var Logger = require("./Logger")
44

55
function ExecuteLocked(){
66
this.lock = new ReadWriteLock();
77
}
88

99
ExecuteLocked.prototype.execRead = function(func){
1010
var deferred = q.defer();
11+
var logger = Logger.instance();
1112
this.lock.readLock(function(release){
1213
func().then(function(obj){
1314
release();
@@ -22,6 +23,7 @@ ExecuteLocked.prototype.execRead = function(func){
2223

2324
ExecuteLocked.prototype.execWrite = function(func){
2425
var deferred = q.defer();
26+
var logger = Logger.instance();
2527
this.lock.writeLock(function(release){
2628
func().then(function(obj){
2729
release();

Application/Jobs.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
var logger = require("./Logger");
1+
var Logger = require("./Logger");
22
var Message = require("../Entities/Message");
33
var util = require("util");
44
var q = require("q");
@@ -7,6 +7,7 @@ function Jobs(){}
77

88
Jobs.prototype.executeJob = function(job) {
99
var self = this;
10+
var logger = Logger.instance();
1011
return q.Promise(function(resolve, reject){
1112
logger.debug("executing job id: %d, partitionId: %d, type: %s, pid: %d", job.id, job.partitionId, job.type, process.pid);
1213
if(self[job.type] === undefined) {

Application/Logger.js

+35-24
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,42 @@
11
var winston = require("winston");
22
var fs = require("fs");
33
var util = require("util");
4+
var q = require("q");
45

5-
var binPath = "bin";
6-
var servicePath = "parallel-queue-partitioner";
7-
var fullpath = util.format("./%s/%s", binPath, servicePath);
6+
var logger;
87

9-
if (!fs.existsSync(util.format("./%s", binPath))) {
10-
fs.mkdirSync(util.format("./%s", binPath));
8+
function newLogger(){
9+
return q.Promise(function(resolve){
10+
var binPath = "bin";
11+
var servicePath = "parallel-queue-partitioner";
12+
var fullpath = util.format("./%s/%s", binPath, servicePath);
13+
14+
if (!fs.existsSync(util.format("./%s", binPath))) {
15+
fs.mkdirSync(util.format("./%s", binPath));
16+
}
17+
18+
if (!fs.existsSync(fullpath)) {
19+
fs.mkdirSync(fullpath);
20+
}
21+
22+
logger = new (winston.Logger)({
23+
transports: [
24+
new (winston.transports.Console)(),
25+
new (winston.transports.File)({
26+
filename: util.format("%s/pid-%s-partitioner.log", fullpath, process.pid),
27+
handleExceptions: true,
28+
exitOnError: false,
29+
level: 'error', //info, warning, error
30+
maxsize: 625000,
31+
zippedArchive: true
32+
})
33+
]
34+
});
35+
resolve(logger);
36+
});
1137
}
1238

13-
if (!fs.existsSync(fullpath)) {
14-
fs.mkdirSync(fullpath);
15-
}
16-
17-
var logger = new (winston.Logger)({
18-
transports: [
19-
new (winston.transports.Console)(),
20-
new (winston.transports.File)({
21-
filename: util.format("%s/pid-%s-partitioner.log", fullpath, process.pid),
22-
handleExceptions: true,
23-
exitOnError: false,
24-
level: 'error', //info, warning, error
25-
maxsize: 625000,
26-
zippedArchive: true
27-
})
28-
]
29-
});
30-
31-
module.exports = logger;
39+
module.exports = {
40+
new: newLogger,
41+
instance: function(){ return logger; }
42+
};

Application/Worker.js

+9-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
var jobService = require("../Services/JobService");
22
var cluster = require("cluster");
33
var jobs = require("./Jobs");
4-
var logger = require("./Logger");
54
var ReadWriteLock = require("rwlock");
65
var queueLock = new ReadWriteLock();
76
var progressLock = new ReadWriteLock();
7+
var Logger = require("./Logger");
88

99
function Worker(worker){
1010
this.worker = worker;
@@ -15,7 +15,7 @@ function Worker(worker){
1515
return;
1616
}
1717

18-
logger.debug("master received completed notify for jobId %d", message.id);
18+
Logger.instance().debug("master received completed notify for jobId %d", message.id);
1919
jobService.done(message.id);
2020
});
2121
}
@@ -24,13 +24,15 @@ var queue = [];
2424
var inProgress = false;
2525

2626
if(cluster.isWorker) {
27-
logger.transports.file.level = process.env["loggerLevel"];
28-
logger.transports.console.level = process.env["loggerLevel"];
27+
Logger.new().then(function(log){
28+
log.transports.file.level = process.env["loggerLevel"];
29+
log.transports.console.level = process.env["loggerLevel"];
30+
31+
log.info("worker %d registered", process.pid);
32+
});
2933

30-
logger.info("worker %d registered", process.pid);
31-
3234
process.on('message', function(job) {
33-
logger.debug("job %d received", job.id);
35+
Logger.instance().debug("job %d received", job.id);
3436
queueLock.writeLock(function(release){
3537
queue.push(function() { return jobs.executeJob(job) });
3638
release();

Partitioner.js

+19-15
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@ var jobs = require("./Application/Jobs");
55
var Lock = require("./Application/ExecuteLocked");
66
var lock = new Lock();
77
var Worker = require("./Application/Worker");
8-
var logger = require("./Application/Logger");
98
var validator = require("validator");
109

11-
1210
var workers = [];
1311
var workerPartitionIndex = 0;
1412
var numberOfWorkers;
13+
var logger;
1514

1615
var defaultConfiguration = {
1716
numberOfWorkers: 1,
@@ -20,6 +19,9 @@ var defaultConfiguration = {
2019
};
2120

2221
function Partitioner(configuration) {
22+
if(cluster.isWorker)
23+
throw new Error("a worker is trying to instantiate a partitioner");
24+
2325
if(configuration !== undefined)
2426
validate(configuration);
2527

@@ -28,20 +30,22 @@ function Partitioner(configuration) {
2830
this.partitionService = new PartitionService(config.cleanIdlePartitionsAfterMinutes || 15);
2931

3032
var processEnv = {};
31-
if(config.loggerLevel !== undefined){
32-
logger.transports.file.level = config.loggerLevel;
33-
logger.transports.console.level = config.loggerLevel;
34-
processEnv["loggerLevel"] = config.loggerLevel;
35-
}else {
36-
processEnv["loggerLevel"] = defaultConfiguration.loggerLevel;
37-
}
3833

39-
if(cluster.isWorker)
40-
throw new Error("a worker is trying to instantiate a partitioner");
41-
42-
for(var i=0; i < numberOfWorkers; i++){
43-
workers.push(new Worker(cluster.fork(processEnv)));
44-
}
34+
var Logger = require("./Application/Logger");
35+
Logger.new().then(function(log){
36+
logger = log;
37+
if(config.loggerLevel !== undefined){
38+
logger.transports.file.level = config.loggerLevel;
39+
logger.transports.console.level = config.loggerLevel;
40+
processEnv["loggerLevel"] = config.loggerLevel;
41+
}else {
42+
processEnv["loggerLevel"] = defaultConfiguration.loggerLevel;
43+
}
44+
45+
for(var i=0; i < numberOfWorkers; i++){
46+
workers.push(new Worker(cluster.fork(processEnv)));
47+
}
48+
});
4549
}
4650

4751
Partitioner.prototype.enqueueJob = function(job, callback){

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ var partitioner = new Partitioner({
112112

113113
## Logger
114114

115-
A logger will be created in the folder ./bin/parallel-queue-partitioner/partitioner.log
115+
A logger will be created in the folder ./bin/parallel-queue-partitioner.
116+
Each worker will create its own log.
116117

117118
## Exemple how to use the partitioner with redis
118119

Samples/test-redis.js

+19-19
Original file line numberDiff line numberDiff line change
@@ -37,25 +37,25 @@ var id=0;
3737

3838
if(cluster.isMaster) {
3939
console.log('pushing messages');
40-
for (var i = 0; i < 50; i++) {
41-
queue.create('jobs', {
42-
partitionId: 0,
43-
type: "sequential",
44-
sequence: i
45-
}).save(function(err) {
46-
if (err) console.log(err);
47-
});
48-
}
40+
// for (var i = 0; i < 50; i++) {
41+
// queue.create('jobs', {
42+
// partitionId: 0,
43+
// type: "sequential",
44+
// sequence: i
45+
// }).save(function(err) {
46+
// if (err) console.log(err);
47+
// });
48+
// }
4949

50-
for (var i = 0; i < 50; i++) {
51-
queue.create('jobs', {
52-
partitionId: 1,
53-
type: "sequential",
54-
sequence: i
55-
}).save(function(err) {
56-
if (err) console.log(err);
57-
});
58-
}
50+
// for (var i = 0; i < 50; i++) {
51+
// queue.create('jobs', {
52+
// partitionId: 1,
53+
// type: "sequential",
54+
// sequence: i
55+
// }).save(function(err) {
56+
// if (err) console.log(err);
57+
// });
58+
// }
5959

6060
for (var i = 0; i < 150; i++) {
6161
queue.create('jobs', {
@@ -72,7 +72,7 @@ if(cluster.isMaster) {
7272
function start(){
7373
var partitioner = new Partitioner({
7474
numberOfWorkers: 4,
75-
loggerLevel: 'info'
75+
loggerLevel: 'debug'
7676
});
7777

7878
setTimeout(function(){

Services/JobService.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
var _ = require("underscore");
22
var q = require("q");
3-
var logger = require("../Application/Logger");
3+
var Logger = require("../Application/Logger");
44

55

66
var jobs = [];
@@ -29,7 +29,7 @@ JobService.prototype.push = function(id, callback) {
2929
jobs.push(new Job(id, callback));
3030
resolve();
3131
}catch(ex){
32-
logger.error(ex);
32+
Logger.instance().error(ex);
3333
reject(ex);
3434
}
3535
});

Services/PartitionService.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
var _ = require("underscore");
22
var moment = require("moment");
33
var q = require("q");
4-
var logger = require("../Application/Logger");
4+
var Logger = require("../Application/Logger");
55
var util = require("util");
66

77
var Lock = require("../Application/ExecuteLocked");
@@ -17,6 +17,7 @@ function Partition(partitionId, worker) {
1717
this.updatedAt = moment().utc().format();
1818

1919
setInterval(function() {
20+
var logger = Logger.instance();
2021
logger.info(util.format("Partition cleanup fired for %d", partitionId));
2122
if(self.updatedAt <= moment().utc().subtract(cleanIdlePartitionsAfterMinutes, 'minutes').format())
2223
lock.execWrite(function(){
@@ -51,7 +52,7 @@ PartitionService.prototype.get = function(partitionId) {
5152
resolve(maybePartition);
5253

5354
} catch(ex){
54-
logger.error(ex);
55+
Logger.instance().error(ex);
5556
reject(ex);
5657
}
5758
});

0 commit comments

Comments
 (0)