Skip to content

Commit

Permalink
Merge pull request #139 from binlaniua/master
Browse files Browse the repository at this point in the history
fix network break,   heart beat check not work
  • Loading branch information
hufeng authored Sep 25, 2019
2 parents 2457af4 + a178130 commit f1ca8a4
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 13 deletions.
5 changes: 5 additions & 0 deletions packages/dubbo/src/__tests__/socket-worker-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import SocketWorker from '../socket-worker';

// 测试启动后,过一会关闭wifi, 过20次的心跳间隔,会关闭socket, 重新创建连接
const worker = SocketWorker.from('47.110.39.117:8888');
console.log(worker);
18 changes: 12 additions & 6 deletions packages/dubbo/src/decode-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const MAGIC_HIGH = 0xda;
const MAGIC_LOW = 0xbb;
const HEADER_LENGTH = 16;
const log = debug('dubbo:decode-buffer');
export const enum DataType {
Noop,
HeardBeat,
Data,
}

/**
* 在并发的tcp数据传输中,会出现少包,粘包的现象
Expand Down Expand Up @@ -52,7 +57,7 @@ export default class DecodeBuffer
return new DecodeBuffer(pid);
}

receive(data: Buffer) {
receive(data: Buffer): DataType {
//concat bytes
this._buffer = Buffer.concat([this._buffer, data]);
let bufferLength = this._buffer.length;
Expand All @@ -78,7 +83,7 @@ export default class DecodeBuffer

//没有找到magicHigh或者magicLow
if (magicHighIndex === -1 || magicLowIndex === -1) {
return;
return DataType.Noop;
}

if (
Expand All @@ -89,15 +94,15 @@ export default class DecodeBuffer
this._buffer = this._buffer.slice(magicHighIndex);
bufferLength = this._buffer.length;
}
return;
return DataType.Noop;
}

if (magicHigh === MAGIC_HIGH && magicLow === MAGIC_LOW) {
//数据量还不够头部的长度
if (bufferLength < HEADER_LENGTH) {
//waiting
log('bufferLength < header length');
return;
return DataType.Noop;
}

//取出头部字节
Expand All @@ -117,18 +122,19 @@ export default class DecodeBuffer
log(`SocketWorker#${this._pid} <=receive= heartbeat data.`);
this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
bufferLength = this._buffer.length;
return;
return DataType.HeardBeat;
}

if (HEADER_LENGTH + bodyLength > bufferLength) {
//waiting
log('header length + body length > buffer length');
return;
return DataType.Noop;
}
const dataBuffer = this._buffer.slice(0, HEADER_LENGTH + bodyLength);
this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
bufferLength = this._buffer.length;
this._subscriber(dataBuffer);
return DataType.Data;
}
}
}
Expand Down
36 changes: 29 additions & 7 deletions packages/dubbo/src/socket-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import debug from 'debug';
import net from 'net';
import Context from './context';
import {decode} from './decode';
import DecodeBuffer from './decode-buffer';
import DecodeBuffer, {DataType} from './decode-buffer';
import DubboEncoder from './encode';
import HeartBeat from './heartbeat';
import {SOCKET_STATUS} from './socket-status';
Expand All @@ -34,6 +34,7 @@ const RETRY_NUM = 20;
const RETRY_TIME = 3000;
//心跳频率
const HEART_BEAT = 180 * 1000;
const RETRY_HEARD_BEAT_TIME = 20;
const log = debug('dubbo:socket-worker');

/**
Expand All @@ -51,6 +52,7 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
this.host = host;
this.port = port;
this._retry = RETRY_NUM;
this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;
this._status = SOCKET_STATUS.PADDING;

log('new SocketWorker#%d|> %s %s', pid, host + ':' + port, this._status);
Expand All @@ -77,6 +79,8 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
public readonly port: number;

private _retry: number;
private _retryTimeoutId: NodeJS.Timer;
private _retryHeartBeat: number;
private _heartBeatTimer: NodeJS.Timer;
private _socket: net.Socket;
private _status: SOCKET_STATUS;
Expand Down Expand Up @@ -153,10 +157,15 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
// `SocketWorker#${this.pid} =connecting=> ${this.host}:${this.port}`,
// );

if (this._socket) {
this._socket.destroy();
}

this._socket = new net.Socket();
// Disable the Nagle algorithm.
this._socket.setNoDelay();

// Disable the Nagle algorithm.
// this._socket.setTimeout(10 * 1000)
// this._socket.setKeepAlive(true)
this._socket
.connect(
this.port,
Expand All @@ -179,6 +188,7 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {

//reset retry number
this._retry = RETRY_NUM;
this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;

//notifiy subscriber, the socketworker was connected successfully
this._subscriber.onConnect({
Expand All @@ -188,15 +198,26 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
});

//heartbeart
//when network is close, the connection maybe not close, so check the heart beat times
this._heartBeatTimer = setInterval(() => {
log('emit heartbeat');
this._socket.write(HeartBeat.encode());
if (this._retryHeartBeat > 0) {
log('emit heartbeat');
this._retryHeartBeat--;
this._socket.write(HeartBeat.encode());
} else {
this._onClose(false);
}
}, HEART_BEAT);
};

private _onData = data => {
log(`SocketWorker#${this.pid} =receive data=> ${this.host}:${this.port}`);
this._decodeBuff.receive(data);
const dataType = this._decodeBuff.receive(data);
switch (dataType) {
case DataType.HeardBeat:
this._retryHeartBeat = RETRY_HEARD_BEAT_TIME; //reset heart beat times
break;
}
};

private _onError = (error: Error) => {
Expand Down Expand Up @@ -238,7 +259,8 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
//set current status
this._status = SOCKET_STATUS.RETRY;
//retry when delay RETRY_TIME
setTimeout(() => {
clearTimeout(this._retryTimeoutId);
this._retryTimeoutId = setTimeout(() => {
this._retry--;
this._initSocket();
}, RETRY_TIME);
Expand Down

0 comments on commit f1ca8a4

Please sign in to comment.