Skip to content

Commit

Permalink
server: Deluge: use async "deflate" and "inflate", handle errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jesec committed Jun 25, 2021
1 parent 57a43bb commit dafe036
Showing 1 changed file with 55 additions and 38 deletions.
93 changes: 55 additions & 38 deletions server/services/Deluge/clientRequestManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {deflateSync, inflateSync} from 'zlib';
import {deflate, inflate} from 'zlib';
import fs from 'fs';
import os from 'os';
import path from 'path';
Expand Down Expand Up @@ -34,11 +34,9 @@ class ClientRequestManager {
private requestQueue: Record<number, [(data: RencodableData) => void, (err: Error) => void]> = {};
private rpc?: Promise<tls.TLSSocket>;
private rpcWithAuth?: Promise<tls.TLSSocket>;
private rpcBuffer?: Buffer;
private rpcBufferSize = 0;

private async receive(data: Buffer): Promise<void> {
const response = decode(inflateSync(data)) as RencodableArray;
const response = decode(data) as RencodableArray;
switch (response[0]) {
case DelugeRpcResponseType.RESPONSE: {
const [, request_id, return_value] = response;
Expand Down Expand Up @@ -74,19 +72,26 @@ class ClientRequestManager {
}

const requestId = this.requestId++;
const payloadBuf = deflateSync(encode([[requestId, ...request]]));

const {length} = payloadBuf;
if (length > 0xff_ff_ff_ff) {
throw new Error('Payload is too large.');
}
return await new Promise<RencodableData>((resolve, reject) => {
deflate(encode([[requestId, ...request]]), (err, payloadBuf) => {
if (err) {
reject(err);
return;
}

const lengthBuf = Buffer.alloc(4);
lengthBuf.writeUInt32BE(length, 0);
const {length} = payloadBuf;
if (length > 0xff_ff_ff_ff) {
reject(new Error('Payload is too large.'));
return;
}

return await new Promise<RencodableData>((resolve, reject) => {
this.requestQueue[requestId] = [resolve, reject];
rpc.write(Buffer.concat([protocolVerBuf, lengthBuf, payloadBuf]));
const lengthBuf = Buffer.alloc(4);
lengthBuf.writeUInt32BE(length, 0);

this.requestQueue[requestId] = [resolve, reject];
rpc.write(Buffer.concat([protocolVerBuf, lengthBuf, payloadBuf]));
});
});
}

Expand All @@ -100,8 +105,9 @@ class ClientRequestManager {

this.requestId = 0;
this.requestQueue = {};
this.rpcBufferSize = 0;
this.rpcBuffer = undefined;

let rpcBufferSize = 0;
let rpcBuffer: Buffer | null = null;

const tlsSocket = tls.connect({
host: this.connectionSettings.host,
Expand All @@ -110,33 +116,44 @@ class ClientRequestManager {
rejectUnauthorized: false,
});

tlsSocket.on('error', (e) => {
this.rpcBuffer = undefined;
this.rpcBufferSize = 0;
const handleError = (e: Error) => {
tlsSocket.destroy();
this.rpcWithAuth = this.rpc = Promise.reject();
reject(e);
});
};

tlsSocket.on('data', (chunk: Buffer) => {
if (this.rpcBuffer != null) {
this.rpcBuffer = Buffer.concat([this.rpcBuffer, chunk], this.rpcBufferSize);
} else {
if (chunk[0] !== DELUGE_RPC_PROTOCOL_VERSION) {
reject(new Error('Unexpected Deluge RPC version.'));
return;
}
tlsSocket.once('error', handleError);
tlsSocket.once('close', handleError);

this.rpcBufferSize = chunk.slice(1, 5).readUInt32BE(0);
this.rpcBuffer = chunk.slice(5);
}
tlsSocket.on('secureConnect', () => {
tlsSocket.on('data', (chunk: Buffer) => {
if (rpcBuffer != null) {
rpcBuffer = Buffer.concat([rpcBuffer, chunk], rpcBufferSize);
} else {
if (chunk[0] !== DELUGE_RPC_PROTOCOL_VERSION) {
handleError(new Error('Unexpected Deluge RPC version.'));
return;
}

rpcBufferSize = chunk.slice(1, 5).readUInt32BE(0);
rpcBuffer = chunk.slice(5);
}

if (this.rpcBuffer.length >= this.rpcBufferSize) {
this.receive(this.rpcBuffer);
this.rpcBufferSize = 0;
this.rpcBuffer = undefined;
}
});
if (rpcBuffer.length >= rpcBufferSize) {
const buf = rpcBuffer;
rpcBuffer = null;
rpcBufferSize = 0;
inflate(buf, (err, data) => {
if (err) {
handleError(err);
return;
}

this.receive(data);
});
}
});

tlsSocket.on('secureConnect', () => {
resolve(tlsSocket);
});
});
Expand Down

0 comments on commit dafe036

Please sign in to comment.