Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow use post body for Queries #36

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const clickhouse = new ClickHouse({
port: 8123,
debug: false,
basicAuth: null,
post: false,
isUseGzip: false,
format: "json", // "json" || "csv" || "tsv"
config: {
Expand Down
218 changes: 49 additions & 169 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ const
stream = require('stream'),
querystring = require('querystring'),
JSONStream = require('JSONStream'),
through = require('through'),
stream2asynciter = require('stream2asynciter'),
{ URL } = require('url'),
tsv = require('tsv');
{ URL } = require('url');


/**
Expand Down Expand Up @@ -55,86 +53,6 @@ const PORT = 8123;
const DATABASE = 'default';
const USERNAME = 'default';

function parseCSV(body, options = { header: true }) {
const data = new tsv.Parser(SEPARATORS.CSV, options).parse(body);
data.splice(data.length - 1, 1);
return data;
}

function parseJSON(body) {
return JSON.parse(body);
}

function parseTSV(body, options = { header: true }) {
const data = new tsv.Parser(SEPARATORS.TSV, options).parse(body);
data.splice(data.length - 1, 1);
return data;
}

function parseCSVStream(s) {
let isFirst = true;
let ref = {
fields: []
};
return through(function (chunk) {
let str = chunk.toString();
let parsed = parseCSV(str, {header: isFirst});
let strarr = str.split("\n");
let plen = (isFirst && strarr.length - 1 || strarr.length) - parsed.length;

if (!isFirst) {
chunk = Buffer.concat([Buffer.from([...s].join("\n")), chunk]).toString();
parsed = parseCSV(str, {header: isFirst});
s = new Set();
}
strarr.splice(strarr.length - plen).forEach((value => s.add(value)));
chunkBuilder.call(this, isFirst, ref, str, parsed);
isFirst = false;
})
}

function parseJSONStream() {
return JSONStream.parse(['data', true]);
}

function parseTSVStream(s) {
let isFirst = true;
let ref = {
fields: []
};
return through(function (chunk) {
let str = chunk.toString();
let parsed = parseTSV(str, {header: isFirst});
let strarr = str.split("\n");
let plen = (isFirst && strarr.length - 1 || strarr.length) - parsed.length;

if (!isFirst) {
chunk = Buffer.concat([Buffer.from([...s].join("\n")), chunk]).toString();
parsed = parseTSV(str, {header: isFirst});
s = new Set();
}
strarr.splice(strarr.length - plen).forEach((value => s.add(value)));
chunkBuilder.call(this, isFirst, ref, str, parsed);
isFirst = false;
});
}

function chunkBuilder(isFirst, ref, chunk, parsed) {
if (isFirst) {
ref.fields = Object.keys(parsed[0]);
parsed.forEach((value) => {
this.queue(value);
});
} else {
parsed.forEach((value) => {
let result = {};
ref.fields.forEach((field, index) => (result[field] = value[index]));
this.queue(result);
result = null;
});
}
}

function encodeValue(quote, v, format, isArray) {
format = ALIASES[format] || format;

Expand Down Expand Up @@ -313,7 +231,9 @@ class QueryCursor {
if (me.opts.debug) {
console.log('exec req headers', me.reqParams.headers);
}

console.log(me.reqParams)
//me.reqParams.query
//delete me.reqParams.query;
me._request = request.post(me.reqParams, (err, res) => {
if (me.opts.debug) {
console.log('exec', err, _.pick(res, [
Expand All @@ -338,39 +258,17 @@ class QueryCursor {
if (me.opts.debug) {
console.log('exec res headers', res.headers);
}

try {
const result = this._parseRowsByFormat(res.body);
cb(null, me.useTotals ? result : result.data || result)
} catch (e) {
cb(e);
let json = JSON.parse(res.body);

cb(null, me.useTotals ? json : json.data);
} catch (err2) {
cb(err2);
}
});
}

_parseRowsByFormat(body, isStream = false) {
let result = null;
let ws;
switch (this._getFormat()) {
case "json":
result = !isStream && parseJSON(body) || parseJSONStream();
break;
case "tsv":
result = !isStream && parseTSV(body) || parseTSVStream(new Set());
break;
case "csv":
result = !isStream && parseCSV(body) || parseCSVStream(new Set());
break;
default:
result = body;
}
return result;
};

_getFormat() {
return this.opts.sessionFormat || this.opts.format;
}


withTotals() {
this.useTotals = true;
return this;
Expand Down Expand Up @@ -406,7 +304,7 @@ class QueryCursor {

return rs;
} else {
const streamParser = this._parseRowsByFormat(null, true);
const toJSON = JSONStream.parse(['data', true]);

const rs = new stream.Readable({ objectMode: true });
rs._read = () => {};
Expand All @@ -415,17 +313,14 @@ class QueryCursor {
const tf = new stream.Transform({ objectMode: true });
let isFirstChunck = true;
tf._transform = function (chunk, encoding, cb) {

// Если для первого chuck первый символ блока данных не '{', тогда:
// 1. в теле ответа не JSON
// 2. сервер нашел ошибку в данных запроса
if (isFirstChunck && (
(me._getFormat() === "json" && chunk[0] !== 123) &&
(me._getFormat() === "csv" && chunk[0] !== 110) &&
(me._getFormat() === "tsv" && chunk[0] !== 110)
)) {
if (isFirstChunck && chunk[0] !== 123) {
this.error = new Error(chunk.toString());

streamParser.emit("error", this.error);
toJSON.emit("error", this.error);
rs.emit('close');

return cb();
Expand All @@ -445,9 +340,9 @@ class QueryCursor {
let s = null;
if (me.opts.isUseGzip) {
const z = zlib.createGunzip();
s = requestStream.pipe(z).pipe(tf).pipe(streamParser)
s = requestStream.pipe(z).pipe(tf).pipe(toJSON)
} else {
s = requestStream.pipe(tf).pipe(streamParser)
s = requestStream.pipe(tf).pipe(toJSON)
}


Expand Down Expand Up @@ -475,14 +370,14 @@ class QueryCursor {
rs.pause = () => {
rs.__pause();
requestStream.pause();
streamParser.pause();
toJSON.pause();
};

rs.__resume = rs.resume;
rs.resume = () => {
rs.__resume();
requestStream.resume();
streamParser.resume();
toJSON.resume();
};

me._request = rs;
Expand Down Expand Up @@ -519,13 +414,13 @@ class ClickHouse {
password: '',
basicAuth: null,
isUseGzip: false,
post: false,
config: {
// session_id : Date.now(),
session_timeout : 60,
output_format_json_quote_64bit_integers : 0,
enable_http_compression : 0
},
format: "json", // "json" || "csv" || "tsv"
}
},
opts
);
Expand Down Expand Up @@ -623,40 +518,8 @@ class ClickHouse {
return encodeValue(false, value, 'TabSeparated');
}).join('\t');
}

_getFormat(query) {
let format = "";
switch (this.opts.format) {
case "json":
format = this._parseFormat(query, " format JSON");
break;
case "tsv":
format = this._parseFormat(query, " format TabSeparatedWithNames");
break;
case "csv":
format = this._parseFormat(query, " format CSVWithNames");
break;
default:
format = " ";
}
return format;
};

_parseFormat(query, def) {
if (query.match(/format/mg) === null) {
this.opts.sessionFormat = this.opts.format;
return def;
}
if (query.match(/format JSON/mg) !== null) {
this.opts.sessionFormat = "json";
} else if (query.match(/format TabSeparated/mg) !== null) {
this.opts.sessionFormat = "tsv";
} else if (query.match(/format CSV/mg) !== null) {
this.opts.sessionFormat = "csv";
}
return "";
}



_mapRowAsObject(fieldList, row) {
return fieldList.map(f => encodeValue(false, row[f] != null ? row[f] : '', 'TabSeparated')).join('\t');
}
Expand Down Expand Up @@ -712,10 +575,18 @@ class ClickHouse {
let sql = query.trim();

// Hack for Sequelize ORM
sql = sql.trimEnd().replace(/;$/gm, "");

if (sql.charAt(sql.length - 1) === ';') {
sql = sql.substr(0, sql.length - 1);
}

if (sql.match(/^(select|show|exists)/i)) {
reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + this._getFormat(sql) + ';') + '&' + querystring.stringify(configQS);
reqParams['url'] = me.url + '?' + querystring.stringify(configQS);
if(me.opts.post){
reqParams['body'] = sql + ' FORMAT JSON';
}else{
reqParams['url'] = me.url + '&query=' + encodeURIComponent(sql + ' FORMAT JSON');
}

if (me.opts.username) {
reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username;
}
Expand All @@ -742,8 +613,13 @@ class ClickHouse {
reqParams['formData'] = formData;
}
} else if (query.match(/^insert/i)) {
reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ' FORMAT TabSeparated') + '&' + querystring.stringify(configQS);

reqParams['url'] = me.url + '?' + querystring.stringify(configQS);
if(me.opts.post){
reqParams['body'] = sql + ' FORMAT TabSeparated';
}else{
reqParams['url'] = me.url + '&query=' + encodeURIComponent(sql + ' FORMAT TabSeparated');
}

if (me.opts.username) {
reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username;
}
Expand All @@ -753,10 +629,15 @@ class ClickHouse {
}

if (data) {
reqParams['body'] = me._getBodyForInsert(sql, data);
reqParams['body'] = me._getBodyForInsert(query, data);
}
} else {
reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ";") + '&' + querystring.stringify(configQS);
reqParams['url'] = me.url + '?' + querystring.stringify(configQS);
if(me.opts.post){
reqParams['body'] = sql;
}else{
reqParams['url'] = me.url + '&query=' + encodeURIComponent(sql);
}

if (me.opts.username) {
reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username;
Expand Down Expand Up @@ -801,6 +682,5 @@ class ClickHouse {
}

module.exports = {
ClickHouse
ClickHouse
};