1
- var cluster = require ( 'cluster' ) ;
2
- var jobService = require ( "./Services/JobService" ) ;
3
- var PartitionService = require ( "./Services/PartitionService" ) ;
4
- var jobs = require ( "./Application/Jobs" ) ;
5
- var Lock = require ( "./Application/ExecuteLocked" ) ;
6
- var lock = new Lock ( ) ;
7
- var Worker = require ( "./Application/Worker" ) ;
8
- var validator = require ( "validator" ) ;
9
- var utils = require ( "./Application/Utils" ) ;
10
- var variables = require ( "./Application/CommonVariables" ) ;
1
+ "use strict"
11
2
12
- var workers = [ ] ;
13
- var workerPartitionIndex = 0 ;
14
- var numberOfWorkers ;
15
- var logger ;
3
+ const cluster = require ( 'cluster' ) ;
4
+ const jobService = require ( "./Services/JobService" ) ;
5
+ const PartitionService = require ( "./Services/PartitionService" ) ;
6
+ const jobs = require ( "./Application/Jobs" ) ;
7
+ const Lock = require ( "./Application/ExecuteLocked" ) ;
8
+ const lock = new Lock ( ) ;
9
+ const Worker = require ( "./Application/Worker" ) ;
10
+ const validator = require ( "validator" ) ;
11
+ const utils = require ( "./Application/Utils" ) ;
12
+ const variables = require ( "./Application/CommonVariables" ) ;
16
13
17
- var defaultConfiguration = {
18
- numberOfWorkers : 1 ,
19
- cleanIdlePartitionsAfterMinutes : 15 ,
20
- loggerLevel : "error" ,
21
- consoleLogger : true ,
22
- fileLogger : true ,
23
- fileLoggerPath : "./logger"
14
+ let _workers = [ ] ;
15
+ let _workerPartitionIndex = 0 ;
16
+ let _numberOfWorkers ;
17
+ let _logger ;
18
+
19
+ const defaultConfiguration = {
20
+ numberOfWorkers : 1 ,
21
+ cleanIdlePartitionsAfterMinutes : 15 ,
22
+ loggerLevel : "error" ,
23
+ consoleLogger : true ,
24
+ fileLogger : true ,
25
+ fileLoggerPath : "./logger"
24
26
} ;
25
27
26
- function Partitioner ( configuration ) {
27
- if ( cluster . isWorker )
28
- throw new Error ( "a worker is trying to instantiate a partitioner" ) ;
29
-
30
- if ( configuration !== undefined )
31
- validate ( configuration ) ;
32
-
33
- var config = configuration !== undefined ? configuration : defaultConfiguration ;
34
- numberOfWorkers = utils . coalesce ( config . numberOfWorkers , defaultConfiguration . numberOfWorkers ) ;
35
- this . partitionService = new PartitionService ( utils . coalesce ( config . cleanIdlePartitionsAfterMinutes , defaultConfiguration . cleanIdlePartitionsAfterMinutes ) ) ;
36
-
37
- var processEnv = { } ;
38
-
39
- var Logger = require ( "./Application/Logger" ) ;
40
- var consoleLogger = utils . coalesce ( config . consoleLogger , defaultConfiguration . consoleLogger ) ;
41
- var fileLogger = utils . coalesce ( config . fileLogger , defaultConfiguration . fileLogger ) ;
42
- var fileLoggerPath = utils . coalesce ( config . fileLoggerPath , defaultConfiguration . fileLoggerPath ) ;
43
- var loggerLevel = utils . coalesce ( config . loggerLevel , defaultConfiguration . loggerLevel ) ;
44
- Logger . new ( consoleLogger , loggerLevel , fileLogger , fileLoggerPath ) . then ( function ( log ) {
45
- logger = log ;
46
- processEnv [ variables . loggerLevel ] = loggerLevel ;
47
- processEnv [ variables . consoleLogger ] = consoleLogger ;
48
- processEnv [ variables . fileLogger ] = fileLogger ;
49
- processEnv [ variables . fileLoggerPath ] = fileLoggerPath ;
50
-
51
- for ( var i = 0 ; i < numberOfWorkers ; i ++ ) {
52
- workers . push ( new Worker ( cluster . fork ( processEnv ) ) ) ;
28
+ class Partitioner {
29
+ constructor ( configuration ) {
30
+ if ( cluster . isWorker )
31
+ throw new Error ( "a worker is trying to instantiate a partitioner" )
32
+
33
+ if ( configuration )
34
+ validate ( configuration )
35
+
36
+ const config = configuration ? configuration : defaultConfiguration
37
+ _numberOfWorkers = utils . coalesce ( config . numberOfWorkers , defaultConfiguration . numberOfWorkers )
38
+
39
+ this . partitionService = new PartitionService ( utils . coalesce ( config . cleanIdlePartitionsAfterMinutes , defaultConfiguration . cleanIdlePartitionsAfterMinutes ) )
40
+
41
+ let processEnv = { }
42
+ const Logger = require ( "./Application/Logger" )
43
+ const consoleLogger = utils . coalesce ( config . consoleLogger , defaultConfiguration . consoleLogger )
44
+ const fileLogger = utils . coalesce ( config . fileLogger , defaultConfiguration . fileLogger )
45
+ const fileLoggerPath = utils . coalesce ( config . fileLoggerPath , defaultConfiguration . fileLoggerPath )
46
+ const loggerLevel = utils . coalesce ( config . loggerLevel , defaultConfiguration . loggerLevel )
47
+ Logger . new ( consoleLogger , loggerLevel , fileLogger , fileLoggerPath ) . then ( log => {
48
+ _logger = log
49
+ processEnv [ variables . loggerLevel ] = loggerLevel
50
+ processEnv [ variables . consoleLogger ] = consoleLogger
51
+ processEnv [ variables . fileLogger ] = fileLogger
52
+ processEnv [ variables . fileLoggerPath ] = fileLoggerPath
53
+
54
+ for ( var i = 0 ; i < _numberOfWorkers ; i ++ ) {
55
+ _workers . push ( new Worker ( cluster . fork ( processEnv ) ) ) ;
53
56
}
54
- }
55
- ) ;
56
- }
57
+ } )
58
+ }
57
59
58
- Partitioner . prototype . enqueueJob = function ( job , callback ) {
59
- var self = this ;
60
- if ( job === null
61
- || job === undefined
62
- || job . id === null
63
- || job . id === undefined
64
- || job . partitionId === null
65
- || job . partitionId === undefined
66
- || job . type === null
67
- || job . type === undefined )
68
- throw new Error ( "Job null or invalid, should contain id, partitionId, type, data: {}" ) ;
69
-
70
- lock . execWrite ( function ( ) {
71
- return self . partitionService . get ( job . partitionId )
72
- . then ( function ( partition ) {
73
- if ( partition == null ) {
74
- var index = ++ workerPartitionIndex % numberOfWorkers ;
75
- return self . partitionService . push ( job . partitionId , workers [ index ] . worker ) ;
76
- } else {
77
- return partition ;
78
- }
79
- } ) ;
80
- } ) . then ( function ( partition ) {
81
- jobService . push ( job . id , callback ) . then ( function ( ) {
82
- logger . debug ( "jobId: %d, partitionId: %d, type: %s, pushed" , job . id , job . partitionId , job . type ) ;
83
- partition . worker . send ( job ) ;
84
- } ) ;
85
- } ) ;
86
- } ;
60
+ enqueueJob ( job , callback ) {
61
+ if ( ! utils . areNotNull ( job , job . id , job . partitionId , job . type ) )
62
+ throw new Error ( "Job null or invalid, should contain id, partitionId, type, data: {}" )
63
+
64
+ lock . execWrite ( ( ) => {
65
+ return this . partitionService . get ( job . partitionId )
66
+ . then ( partition => {
67
+ if ( utils . isNull ( partition ) ) {
68
+ const index = ++ _workerPartitionIndex % _numberOfWorkers
69
+ return this . partitionService . push ( job . partitionId , _workers [ index ] . worker )
70
+ } else {
71
+ return partition
72
+ }
73
+ } )
74
+ } ) . then ( partition => {
75
+ jobService . push ( job . id , callback ) . then ( ( ) => {
76
+ _logger . debug ( "jobId: %d, partitionId: %d, type: %s, pushed" , job . id , job . partitionId , job . type )
77
+ partition . worker . send ( job )
78
+ } )
79
+ } )
80
+ }
81
+ }
87
82
88
- function validate ( configuration ) {
89
- if ( configuration . numberOfWorkers !== undefined && ! validator . isInt ( configuration . numberOfWorkers , { min :1 } ) )
83
+ function validate ( configuration ) {
84
+ if ( configuration . numberOfWorkers !== undefined && ! validator . isInt ( configuration . numberOfWorkers , { min : 1 } ) )
90
85
throw new Error ( "numberOfWorkers should be an integer >= 1" ) ;
91
- if ( configuration . cleanIdlePartitionsAfterMinutes !== undefined && ! validator . isInt ( configuration . cleanIdlePartitionsAfterMinutes , { min :1 } ) )
86
+ if ( configuration . cleanIdlePartitionsAfterMinutes !== undefined && ! validator . isInt ( configuration . cleanIdlePartitionsAfterMinutes , { min : 1 } ) )
92
87
throw new Error ( "cleanIdlePartitionsAfterMinutes should be an integer >= 1" ) ;
93
- if ( configuration . loggerLevel !== undefined && ! (
94
- validator . equals ( configuration . loggerLevel , 'debug' )
88
+ if ( configuration . loggerLevel !== undefined && ! (
89
+ validator . equals ( configuration . loggerLevel , 'debug' )
95
90
|| validator . equals ( configuration . loggerLevel , 'info' )
96
91
|| validator . equals ( configuration . loggerLevel , 'warn' )
97
92
|| validator . equals ( configuration . loggerLevel , 'error' ) )
98
- )
93
+ )
99
94
throw new Error ( "loggerLevel should be debug, info, warn or error" ) ;
100
- if ( configuration . consoleLogger !== undefined && ! (
101
- validator . equals ( configuration . consoleLogger , true )
95
+ if ( configuration . consoleLogger !== undefined && ! (
96
+ validator . equals ( configuration . consoleLogger , true )
102
97
|| validator . equals ( configuration . consoleLogger , false ) )
103
- )
98
+ )
104
99
throw new Error ( "consoleLogger should be true or false" ) ;
105
- if ( configuration . fileLogger !== undefined && ! (
106
- validator . equals ( configuration . fileLogger , true )
100
+ if ( configuration . fileLogger !== undefined && ! (
101
+ validator . equals ( configuration . fileLogger , true )
107
102
|| validator . equals ( configuration . fileLogger , false ) )
108
- )
103
+ )
109
104
throw new Error ( "fileLogger should be true or false" ) ;
110
- if ( configuration . fileLoggerPath !== undefined && typeof ( configuration . fileLoggerPath ) !== typeof ( defaultConfiguration . fileLoggerPath ) )
105
+ if ( configuration . fileLoggerPath !== undefined && typeof ( configuration . fileLoggerPath ) !== typeof ( defaultConfiguration . fileLoggerPath ) )
111
106
throw new Error ( "fileLoggerPath should be a string" ) ;
112
107
}
113
108
114
109
module . exports = {
115
110
Partitioner : Partitioner ,
116
- registerJob : function ( title , func ) {
111
+ registerJob : function ( title , func ) {
117
112
jobs [ title ] = func ;
118
113
}
119
114
} ;
0 commit comments