Skip to content

Commit 6ab37e1

Browse files
committed
start callback
1 parent 9fb3dcf commit 6ab37e1

File tree

1 file changed

+53
-109
lines changed

1 file changed

+53
-109
lines changed

lib/MyEmitter.js

Lines changed: 53 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ const mysql = require('mysql');
88
const EventEmitter = require('events');
99

1010

11-
class MyEmitter extends EventEmitter
12-
{
11+
class MyEmitter extends EventEmitter {
1312

1413
/**
1514
* @param Object conf
@@ -20,29 +19,25 @@ class MyEmitter extends EventEmitter
2019
* @param Object [conf.binlog.lastTime=0] - Start emitting events after this time
2120
* @param Object [conf.binlog.recoverTimeout=240]
2221
*/
23-
constructor(conf)
24-
{
22+
constructor(conf) {
2523
super();
2624

2725

2826
// Create mysql pool
2927
this.pool = mysql.createPool(Object.assign({}, (conf.mysql ? conf.mysql : {})));
3028

3129
// Check connection
32-
this.pool.getConnection(function(err, conn)
33-
{
30+
this.pool.getConnection(function(err, conn) {
3431
if (err) throw err;
35-
conn.ping(function(err)
36-
{
32+
conn.ping(function(err) {
3733
if (err) throw err;
3834
conn.release();
3935
});
4036
});
4137

4238

4339
// Conf
44-
conf.binlog = Object.assign(
45-
{
40+
conf.binlog = Object.assign({
4641
slaveId: 1,
4742
lastPos: 0,
4843
lastTime: 0,
@@ -60,8 +55,7 @@ class MyEmitter extends EventEmitter
6055
},
6156
packet: // BinlogPacket
6257
{
63-
last:
64-
{
58+
last: {
6559
pos: conf.binlog.lastPos,
6660
time: conf.binlog.lastTime,
6761
},
@@ -82,8 +76,7 @@ class MyEmitter extends EventEmitter
8276
this._binlogRowsEvents = false;
8377
this._wildcardEvents = false;
8478

85-
this.on('newListener', function(event, listener)
86-
{
79+
this.on('newListener', function(event, listener) {
8780
if (
8881
typeof event == 'number' &&
8982
event < Event.ANY
@@ -97,8 +90,7 @@ class MyEmitter extends EventEmitter
9790
event == BinlogEvent.DELETE_ROWS_EVENT
9891
) this._binlogRowsEvents = true;
9992

100-
if (event == Event.ANY)
101-
{
93+
if (event == Event.ANY) {
10294
this._binlogNativeEvents = true;
10395
this._binlogRowsEvents = true;
10496
this._wildcardEvents = true;
@@ -113,57 +105,46 @@ class MyEmitter extends EventEmitter
113105

114106

115107
/* Shortcuts */
116-
get pos()
117-
{
108+
get pos() {
118109
return this._conf.packet.last.pos;
119110
}
120-
get time()
121-
{
111+
get time() {
122112
return this._conf.packet.last.time;
123113
}
124114

125115
/**
126116
* @param Function [cb]
127117
*/
128-
start(cb)
129-
{
130-
this._loadHasChecksum(cb ? cb : (err) => { throw err; });
118+
start(cb) {
119+
this._loadHasChecksum(cb ? cb : (err) => { if (err) throw err; });
131120
}
132121

133-
_loadHasChecksum(cb)
134-
{
135-
if (this._conf.packet.hasChecksum === null)
136-
{
122+
_loadHasChecksum(cb) {
123+
if (this._conf.packet.hasChecksum === null) {
137124
this.pool.query(
138125
'SELECT `VARIABLE_VALUE` FROM `information_schema`.`GLOBAL_VARIABLES` WHERE `VARIABLE_NAME` LIKE "MASTER_VERIFY_CHECKSUM"',
139-
function(err, res)
140-
{
126+
function(err, res) {
141127
if (err) return cb(err);
142128

143129
this._conf.packet.hasChecksum = (res.length > 0 && res[0].VARIABLE_VALUE == 'ON');
144130

145131
this._connect(cb);
146132

147133
}.bind(this));
148-
}
149-
else
150-
{
134+
} else {
151135
this._connect(cb);
152136
}
153137
}
154138

155-
_connect(cb)
156-
{
139+
_connect(cb) {
157140
// Dont connect twice
158-
if (this._conn)
159-
{
141+
if (this._conn) {
160142
cb(new Error('Already connected'));
161143
return;
162144
}
163145

164146
// Get connection
165-
this.pool.getConnection(function(err, conn)
166-
{
147+
this.pool.getConnection(function(err, conn) {
167148
if (err) return cb(err);
168149

169150
this._conn = conn;
@@ -178,11 +159,9 @@ class MyEmitter extends EventEmitter
178159
rg.on('unhandledError', this.emit.bind(this, Event.ERROR_COM));
179160
rg.on('timeout', this.emit.bind(this, Event.ERROR_COM));
180161

181-
rg.on('end', function()
182-
{
162+
rg.on('end', function() {
183163
// Binlog
184-
this._bl = new Binlog(
185-
{
164+
this._bl = new Binlog({
186165
binlog: this._conf.binlog,
187166
packet: this._conf.packet
188167
},
@@ -196,8 +175,7 @@ class MyEmitter extends EventEmitter
196175
this._bl.on('timeout', this.emit.bind(this, Event.TIMEOUT));
197176
this._bl.on('end', this.emit.bind(this, Event.END));
198177

199-
this._bl.on('handshake', function()
200-
{
178+
this._bl.on('handshake', function() {
201179
this.emit(Event.CONNECTED);
202180

203181
cb();
@@ -213,18 +191,14 @@ class MyEmitter extends EventEmitter
213191
}.bind(this));
214192
}
215193

216-
_binlog_cb(err, packet)
217-
{
218-
if (err)
219-
{
194+
_binlog_cb(err, packet) {
195+
if (err) {
220196
this.emit(Event.ERROR_COM, err);
221-
}
222-
else if (
197+
} else if (
223198
packet.error ||
224199
packet.data_error ||
225200
packet.data_row_errors
226-
)
227-
{
201+
) {
228202
let err1 = packet.error;
229203
let err2 = packet.data_error;
230204
let err3 = packet.data_row_errors;
@@ -233,41 +207,33 @@ class MyEmitter extends EventEmitter
233207
delete packet.data_error;
234208
delete packet.data_row_errors;
235209

236-
if (err1)
237-
{
210+
if (err1) {
238211
this.emit(Event.ERROR_PARSE, err1, packet);
239212
}
240213

241-
if (err2)
242-
{
214+
if (err2) {
243215
this.emit(Event.ERROR_PARSE_DATA, err2, packet);
244216
}
245217

246-
if (err3)
247-
{
248-
for (let i = 0, l = err3.length; i < l; i++)
249-
{
218+
if (err3) {
219+
for (let i = 0, l = err3.length; i < l; i++) {
250220
this.emit(Event.ERROR_PARSE_DATA, err3[i], packet);
251221
}
252222
}
253-
}
254-
else
255-
{
223+
} else {
256224
packet.skipped ?
257225
this.emit(Event.SKIP, packet) :
258226
this.emit(Event.BINLOG, packet);
259227
}
260228
}
261229

262230
/** Emit aditional events */
263-
emit(type, ...data)
264-
{
231+
emit(type, ...data) {
265232
// Super
266233
super.emit.apply(this, arguments);
267234

268235
// Wildcard Events
269-
if (this._wildcardEvents)
270-
{
236+
if (this._wildcardEvents) {
271237
super.emit.call(this, Event.ANY, type, ...data);
272238
}
273239

@@ -280,8 +246,7 @@ class MyEmitter extends EventEmitter
280246
) &&
281247
data[0] && // has packet object
282248
data[0].eventType // has event type
283-
)
284-
{
249+
) {
285250
var pck = data[0];
286251

287252
// native rows event
@@ -302,10 +267,8 @@ class MyEmitter extends EventEmitter
302267
pck.eventType == BinlogEvent.DELETE_ROWS_EVENTv1 ||
303268
pck.eventType == BinlogEvent.DELETE_ROWS_EVENTv2
304269
)
305-
)
306-
{
307-
switch (pck.eventType)
308-
{
270+
) {
271+
switch (pck.eventType) {
309272
case BinlogEvent.WRITE_ROWS_EVENTv0:
310273
case BinlogEvent.WRITE_ROWS_EVENTv1:
311274
case BinlogEvent.WRITE_ROWS_EVENTv2:
@@ -334,17 +297,15 @@ class MyEmitter extends EventEmitter
334297
}
335298

336299
// native binlog event
337-
if (this._binlogNativeEvents)
338-
{
300+
if (this._binlogNativeEvents) {
339301
this.emit(pck.eventType, pck);
340302
}
341303

342304
return;
343305
}
344306

345307
// Event.ERROR
346-
switch (type)
347-
{
308+
switch (type) {
348309
case Event.ERROR_SQL:
349310
case Event.ERROR_COM:
350311
case Event.ERROR_PARSE:
@@ -356,8 +317,7 @@ class MyEmitter extends EventEmitter
356317
}
357318

358319
// recover
359-
switch (type)
360-
{
320+
switch (type) {
361321
case Event.ERROR_SQL:
362322
case Event.ERROR_COM:
363323
case Event.ERROR_RECOVER:
@@ -370,24 +330,18 @@ class MyEmitter extends EventEmitter
370330
}
371331

372332

373-
_disconnect(cb)
374-
{
375-
try
376-
{
333+
_disconnect(cb) {
334+
try {
377335
this._bl.removeAllListeners(); // stop recovering
378336
this._conn.removeAllListeners();
379337

380338
this._bl.end();
381339

382340
this._conn._protocol.quit(); // throws a squence error
383341
this._conn.destroy(); // maybe the connection is broke
384-
}
385-
catch (erro)
386-
{
342+
} catch (erro) {
387343
// nothing to be done here
388-
}
389-
finally
390-
{
344+
} finally {
391345
this._bl = null;
392346
this._conn = null;
393347

@@ -396,31 +350,24 @@ class MyEmitter extends EventEmitter
396350
}
397351
}
398352

399-
_reconnect(cb)
400-
{
353+
_reconnect(cb) {
401354
this.emit(Event.RECONNECTING);
402355

403-
this._disconnect(function(err)
404-
{
356+
this._disconnect(function(err) {
405357
if (err) return cb(err);
406358

407359
this._connect(cb);
408360

409361
}.bind(this));
410362
}
411363

412-
_recover()
413-
{
364+
_recover() {
414365
this.emit(Event.RECOVERING);
415366

416-
this._disconnect(function()
417-
{
418-
setTimeout(function()
419-
{
420-
this._connect(function(err)
421-
{
422-
if (err)
423-
{
367+
this._disconnect(function() {
368+
setTimeout(function() {
369+
this._connect(function(err) {
370+
if (err) {
424371
this.emit(Event.ERROR_RECOVER, err);
425372

426373
if (!this._conn || !this._bl) this._recover();
@@ -437,19 +384,16 @@ class MyEmitter extends EventEmitter
437384
/**
438385
* @param Function [cb]
439386
*/
440-
stop(cb)
441-
{
387+
stop(cb) {
442388
this._disconnect(cb ? cb : (err) => { throw err; });
443389
}
444390

445391

446392
/**
447393
* @param Function [cb]
448394
*/
449-
restart(cb)
450-
{
451-
this.stop(function(err)
452-
{
395+
restart(cb) {
396+
this.stop(function(err) {
453397
if (err) { if (cb) cb(err); return; }
454398
this.start(cb);
455399
}.bind(this));

0 commit comments

Comments
 (0)