Skip to content
43 changes: 43 additions & 0 deletions benchmarks/message-io/incoming-message-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const { createBenchmark } = require('../common');
const { Readable } = require('stream');

const Debug = require('tedious/lib/debug');
const IncomingMessageStream = require('tedious/lib/incoming-message-stream');
const { Packet } = require('tedious/lib/packet');

const bench = createBenchmark(main, {
n: [100, 1000, 10000, 100000]
});

function main({ n }) {
const debug = new Debug();

const stream = Readable.from((async function*() {
for (let i = 0; i < n; i++) {
const packet = new Packet(2);
packet.last(true);
packet.addData(Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 9]));

yield packet.buffer;
}
})());

const incoming = new IncomingMessageStream(debug);
stream.pipe(incoming);

bench.start();
console.profile('incoming-message-stream');

(async function() {
let total = 0;

for await (m of incoming) {
for await (const buf of m) {
total += buf.length;
}
}

console.profileEnd('incoming-message-stream');
bench.end(n);
})();
}
72 changes: 72 additions & 0 deletions benchmarks/message-io/outgoing-message-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
const { createBenchmark } = require('../common');
const { Duplex } = require('stream');

const Debug = require('../../lib/debug');
const OutgoingMessageStream = require('../../lib/outgoing-message-stream');
const Message = require('../../lib/message');

const bench = createBenchmark(main, {
n: [100, 1000, 10000, 100000]
});

function main({ n }) {
const debug = new Debug();

const stream = new Duplex({
read() {},
write(chunk, encoding, callback) {
// Just consume the data
callback();
}
});

const payload = [
Buffer.alloc(1024),
Buffer.alloc(1024),
Buffer.alloc(1024),
Buffer.alloc(256),
Buffer.alloc(256),
Buffer.alloc(256),
Buffer.alloc(256),
];

const out = new OutgoingMessageStream(debug, {
packetSize: 8 + 1024
});
out.pipe(stream);

bench.start();
console.profile('write-message');

function writeNextMessage(i) {
if (i == n) {
out.end();
out.once('finish', () => {
console.profileEnd('write-message');
bench.end(n);
});
return;
}

const m = new Message({ type: 2, resetConnection: false });
out.write(m);

for (const buf of payload) {
m.write(buf);
}

m.end();

if (out.needsDrain) {
out.once('drain', () => {
writeNextMessage(i + 1);
});
} else {
process.nextTick(() => {
writeNextMessage(i + 1);
});
}
}

writeNextMessage(0);
}
39 changes: 39 additions & 0 deletions benchmarks/message-io/read-message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
const { createBenchmark } = require('../common');
const { Readable } = require('stream');

const Debug = require('tedious/lib/debug');
const MessageIO = require('tedious/lib/message-io');
const { Packet } = require('tedious/lib/packet');

const bench = createBenchmark(main, {
n: [100, 1000, 10000, 100000]
});

function main({ n }) {
const debug = new Debug();

const stream = Readable.from((async function*() {
for (let i = 0; i < n; i++) {
const packet = new Packet(2);
packet.last(true);
packet.addData(Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 9]));

yield packet.buffer;
}
})());

(async function() {
bench.start();
console.profile('read-message');

let total = 0;
for (let i = 0; i < n; i++) {
for await (const chunk of MessageIO.readMessage(stream, debug)) {
total += chunk.length;
}
}

console.profileEnd('read-message');
bench.end(n);
})();
}
43 changes: 43 additions & 0 deletions benchmarks/message-io/write-message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const { createBenchmark, createConnection } = require('../common');
const { Duplex } = require('stream');

const Debug = require('tedious/lib/debug');
const MessageIO = require('tedious/lib/message-io');

const bench = createBenchmark(main, {
n: [100, 1000, 10000, 100000]
});

function main({ n }) {
const debug = new Debug();

const stream = new Duplex({
read() {},
write(chunk, encoding, callback) {
// Just consume the data
callback();
}
});

const payload = [
Buffer.alloc(1024),
Buffer.alloc(1024),
Buffer.alloc(1024),
Buffer.alloc(256),
Buffer.alloc(256),
Buffer.alloc(256),
Buffer.alloc(256),
];

(async function() {
bench.start();
console.profile('write-message');

for (let i = 0; i <= n; i++) {
await MessageIO.writeMessage(stream, debug, 8 + 1024, 2, payload);
}

console.profileEnd('write-message');
bench.end(n);
})();
}
38 changes: 24 additions & 14 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,25 @@

socket.setKeepAlive(true, KEEP_ALIVE_INITIAL_DELAY);

try {
await this.sendPreLogin(socket);
} catch (err) {
signal.throwIfAborted();

throw this.wrapSocketError(err as Error);
}

this.transitionTo(this.STATE.SENT_PRELOGIN);

let preloginResponse: PreloginPayload;
try {
preloginResponse = await this.readPreloginResponse(socket, signal);
} catch (err) {
signal.throwIfAborted();

throw this.wrapSocketError(err as Error);
}

this.messageIo = new MessageIO(socket, this.config.options.packetSize, this.debug);
this.messageIo.on('secure', (cleartext) => { this.emit('secure', cleartext); });

Expand All @@ -2062,10 +2081,6 @@
this.closed = false;
this.debug.log('connected to ' + this.config.server + ':' + this.config.options.port);

this.sendPreLogin();

this.transitionTo(this.STATE.SENT_PRELOGIN);
const preloginResponse = await this.readPreloginResponse(signal);
await this.performTlsNegotiation(preloginResponse, signal);

this.sendLogin7Packet();
Expand Down Expand Up @@ -2415,7 +2430,7 @@
/**
* @private
*/
sendPreLogin() {
async sendPreLogin(socket: net.Socket) {
const [, major, minor, build] = /^(\d+)\.(\d+)\.(\d+)/.exec(version) ?? ['0.0.0', '0', '0', '0'];
const payload = new PreloginPayload({
// If encrypt setting is set to 'strict', then we should have already done the encryption before calling
Expand All @@ -2425,7 +2440,7 @@
version: { major: Number(major), minor: Number(minor), build: Number(build), subbuild: 0 }
});

this.messageIo.sendMessage(TYPE.PRELOGIN, payload.data);
await MessageIO.writeMessage(socket, this.debug, this.config.options.packetSize, TYPE.PRELOGIN, [ payload.data ]);
this.debug.payload(function() {
return payload.toString(' ');
});
Expand Down Expand Up @@ -3314,7 +3329,7 @@
}
}

async readPreloginResponse(signal: AbortSignal): Promise<PreloginPayload> {
async readPreloginResponse(socket: net.Socket, signal: AbortSignal): Promise<PreloginPayload> {
signal.throwIfAborted();

let messageBuffer = Buffer.alloc(0);
Expand All @@ -3325,14 +3340,9 @@
signal.addEventListener('abort', onAbort, { once: true });

try {
const message = await Promise.race([
this.messageIo.readMessage().catch((err) => {
throw this.wrapSocketError(err);
}),
signalAborted
]);

const message = MessageIO.readMessage(socket, this.debug)

Check failure on line 3343 in src/connection.ts

View workflow job for this annotation

GitHub Actions / Linting

Missing semicolon
const iterator = message[Symbol.asyncIterator]();

try {
while (true) {
const { done, value } = await Promise.race([
Expand Down
12 changes: 11 additions & 1 deletion src/connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,17 @@ export async function connectInSequence(options: { host: string, port: number, l
signal.throwIfAborted();

const errors: any[] = [];
const addresses = await lookupAllAddresses(options.host, lookup, signal);
const startTime = process.hrtime();
console.log('looking up addresses for ', options.host);

let addresses: dns.LookupAddress[] = [];
try {
addresses = await lookupAllAddresses(options.host, lookup, signal);
} catch (err) {
console.log('lookup failed', err, process.hrtime(startTime));
throw err;
}
console.log('looked up addresses for', options.host, process.hrtime(startTime));

for (const address of addresses) {
try {
Expand Down
Loading
Loading