-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
162 lines (145 loc) · 5.78 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
'use strict'
const starter = require('./lib/process-utils').starter;
const restarter = require('./lib/process-utils').restarter;
const stopper = require('./lib/process-utils').stopper;
const status = require('./lib/constants').status;
const minimalConfigChecker = require('./lib/constants').minimalConfigChecker;
const customEvts = require('./lib/constants').events;
const processEvt = require('./lib/constants').processEvt;
const Type = require('type-of-is');
const Rx = require('rx');
class SpawnWatch {
constructor(options){
this._status = new Rx.BehaviorSubject(status.stopped);
this._childProcess = null;
this._currentConfig = null;
this._evtStream = new Rx.Subject();
//register options (default is no IPC channel, utf-8 encoding for inputs)
this._options = Object.assign({
ipc:false,
encodings:{
stdin:'utf-8',
stdout:'utf-8',
stderr:'utf-8'
}
}, options);
//status update from process events
this.processEventStream.subscribe(newStatus => { this._status.onNext(newStatus); });
//react to status changes
this._status.subscribe(newStatus => {
if(newStatus === status.stopped) {
this._currentConfig = null;
this._childProcess = null;
}
});
};
//spawn process according to config object
start(config) {
if(this._status.value === status.stopped && config && minimalConfigChecker(config)) {
let evtA = new processEvt(customEvts.process, status.pendingStart);
this._evtStream.onNext(evtA);
this._currentConfig = config;
this._childProcess = starter(config, this._evtStream, this._options);
//fire started evt
let evtB = new processEvt(customEvts.process, status.started);
this._evtStream.onNext(evtB);
} else {
//conditions not met to spawn process
return false;
}
return true;
}
//stop process
stop() {
if(this._childProcess && this._status.value === status.started) {
let evt = new processEvt(customEvts.process, status.pendingStop);
this._evtStream.onNext(evt);
stopper(this._childProcess, this._evtStream);
return true;
} else {
//conditions not met to kill process
return false;
}
}
//stop & spawn again process (optionaly according to config object)
restart(config) {
if(this._status.value === status.started) {
if(config) {
if(minimalConfigChecker(config)) {
//restart enabled with new config
let restartSubscription = this.processEventStream
.filter(processStatus => { return processStatus === status.stopped })
.first()
.subscribe(processStatus => {
this.start(config);
});
this.stop();
return true;
} else {
//conditions not met to re-spawn process
return false;
}
} else {
//restart enabled with previous config
let storeConfig = this.currentConfig;
let restartSubscription = this.processEventStream
.filter(processStatus => { return processStatus === status.stopped })
.first()
.subscribe(processStatus => {
this.start(storeConfig);
});
this.stop();
return true;
}
} else {
//conditions not met to re-spawn process
return false;
}
}
//input data to the spawned process (via stdin)
input(data) {
if(!this._childProcess || !data) return false;
let stdin = this._childProcess.stdin;
//adapt depending on data type. Accepts string or JSON
let payload = (Type.is(data, String)) ? data : JSON.stringify(data);
stdin.write(payload);
return true;
}
//input via ipc
ipcInput(data) {
if(data && this._childProcess && this._childProcess.send) {
//only works when there is a childprocess with ipc
this._childProcess.send(data);
return true;
}
return false;
}
get processStatus() { return this._status.value; }
get currentConfig() { return Object.assign({}, this._currentConfig); }
get processEventStream() {
let processStream = this._evtStream.asObservable()
.filter(evt => { return (evt.type === customEvts.process) })
.map(evt => { return evt.payload });
return processStream;
}
get outEventStream() {
let outEventStream = this._evtStream.asObservable()
.filter(evt => { return (evt.type === customEvts.logs) })
.map(evt => { return evt.payload });
return outEventStream;
}
get errorStream() {
let errorStream = this._evtStream.asObservable()
.filter(evt => { return (evt.type === customEvts.childError) })
.map(evt => { return evt.payload });
return errorStream;
}
//only emits for ipc enabled processes
get ipcStream() {
let ipcStream = this._evtStream.asObservable()
.filter(evt => { return (evt.type === customEvts.ipcData) })
.map(evt => { return evt.payload });
return ipcStream;
}
}
module.exports = SpawnWatch;