Skip to content

Commit

Permalink
Merge pull request #122 from microsoft/hectorjavierhdz/pgcursor
Browse files Browse the repository at this point in the history
[Postgres] Add support for cursor and streams
  • Loading branch information
hectorhdzg authored Apr 5, 2023
2 parents 65cc137 + 8a2622a commit 748492a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
"**/CVS": true,
"**/.DS_Store": true,
"**/.dist": true,
"**/node_modules": true
"**/node_modules": false
}
}
82 changes: 54 additions & 28 deletions src/diagnostic-channel-publishers/src/pg.pub.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
import {channel, IModulePatcher, PatchFunction} from "diagnostic-channel";
import {EventEmitter} from "events";
import { channel, IModulePatcher } from "diagnostic-channel";
import { EventEmitter } from "events";

const publisherName = "postgres";

Expand Down Expand Up @@ -30,7 +30,7 @@ export interface IPostgresData {
time: Date;
}

type PostgresCallback = (err: Error, res: IPostgresResult) => any;
type PostgresCallback = (err: Error | null, res: IPostgresResult) => unknown;

function postgres6PatchFunction(originalPg, originalPgPath) {
const originalClientQuery = originalPg.Client.prototype.query;
Expand Down Expand Up @@ -58,7 +58,7 @@ function postgres6PatchFunction(originalPg, originalPgPath) {
cb = cb[diagnosticOriginalFunc];
}

const trackingCallback = channel.bindToContext(function(err: Error, res: IPostgresResult): any {
const trackingCallback = channel.bindToContext(function (err: Error, res: IPostgresResult): any {
const end = process.hrtime(start);
data.result = res && { rowCount: res.rowCount, command: res.command };
data.error = err;
Expand Down Expand Up @@ -154,7 +154,7 @@ function postgres6PatchFunction(originalPg, originalPgPath) {
return originalPg;
}

function postgres7PatchFunction(originalPg, originalPgPath) {
function postgresLatestPatchFunction(originalPg, originalPgPath) {
const originalClientQuery = originalPg.Client.prototype.query;
const diagnosticOriginalFunc = "__diagnosticOriginalFunc";

Expand All @@ -173,15 +173,16 @@ function postgres7PatchFunction(originalPg, originalPgPath) {
duration: 0,
time: new Date()
};
let queryResult: unknown;
const start = process.hrtime();
let queryResult;


function patchCallback(cb?: PostgresCallback): PostgresCallback {
if (cb && cb[diagnosticOriginalFunc]) {
cb = cb[diagnosticOriginalFunc];
}

const trackingCallback = channel.bindToContext(function(err: Error, res: IPostgresResult): any {
const trackingCallback = channel.bindToContext(function (err: Error, res: IPostgresResult): any {
const end = process.hrtime(start);
data.result = res && { rowCount: res.rowCount, command: res.command };
data.error = err;
Expand Down Expand Up @@ -228,7 +229,7 @@ function postgres7PatchFunction(originalPg, originalPgPath) {
} else {
callbackProvided = typeof values === "function";
values = callbackProvided ? patchCallback(values) : values;
}
}
}
} else {
if (typeof config.name === "string") {
Expand All @@ -238,7 +239,10 @@ function postgres7PatchFunction(originalPg, originalPgPath) {
text: config.text,
args: config.values
};
} else {
} else if ((config as any).cursor) {
data.query.text = config.cursor?.text;
}
else {
data.query.text = config.text;
}

Expand All @@ -263,24 +267,46 @@ function postgres7PatchFunction(originalPg, originalPgPath) {
arguments[2] = callback;
arguments.length = (arguments.length > 3) ? arguments.length : 3;

queryResult = originalClientQuery.apply(this, arguments);
try {
queryResult = originalClientQuery.apply(this, arguments);
}
catch (err) {
patchCallback()(err, undefined);
throw err;
}

if (!callbackProvided) {
// no callback, so create a pass along promise
return queryResult
// pass resolved promise after publishing the event
.then((result) => {
patchCallback()(undefined, result);
return new this._Promise((resolve, reject) => {
resolve(result);
if ((queryResult instanceof Promise)) {
return queryResult
// pass resolved promise after publishing the event
.then((result) => {
patchCallback()(undefined, result);
return new this._Promise((resolve, reject) => {
resolve(result);
});
})
// pass along rejected promise after publishing the error
.catch((error) => {
patchCallback()(error, undefined);
return new this._Promise((resolve, reject) => {
reject(error);
});
});
})
// pass along rejected promise after publishing the error
.catch((error) => {
patchCallback()(error, undefined);
return new this._Promise((resolve, reject) => {
reject(error);
});
});
}
// Result could be a Cursor, QueryStream or Readable Stream
else {
let command = (queryResult as any).text ? (queryResult as any).text : "";
if ((queryResult as any).cursor) {
command = (queryResult as any).cursor?.text;
}
if (command) {
const res: IPostgresResult = {
command: command,
rowCount: 0,
};
patchCallback()(undefined, res);
}
}
}
return queryResult;
};
Expand All @@ -293,13 +319,13 @@ export const postgres6: IModulePatcher = {
patch: postgres6PatchFunction
};

export const postgres7: IModulePatcher = {
export const postgres: IModulePatcher = {
versionSpecifier: ">=7.* <=8.*",
patch: postgres7PatchFunction,
patch: postgresLatestPatchFunction,
publisherName: publisherName
};

export function enable() {
channel.registerMonkeyPatch("pg", postgres6);
channel.registerMonkeyPatch("pg", postgres7);
channel.registerMonkeyPatch("pg", postgres);
}

0 comments on commit 748492a

Please sign in to comment.