diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index c8bc1e10..cabcad15 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -84,6 +84,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I useCloudFetch: false, cloudFetchConcurrentDownloads: 10, + + useLZ4Compression: true, }; } diff --git a/lib/DBSQLOperation/index.ts b/lib/DBSQLOperation/index.ts index 8a0bf707..4a57d635 100644 --- a/lib/DBSQLOperation/index.ts +++ b/lib/DBSQLOperation/index.ts @@ -377,14 +377,14 @@ export default class DBSQLOperation implements IOperation { case TSparkRowSetType.ARROW_BASED_SET: resultSource = new ArrowResultConverter( this.context, - new ArrowResultHandler(this.context, this._data, metadata.arrowSchema), + new ArrowResultHandler(this.context, this._data, metadata.arrowSchema, metadata.lz4Compressed), metadata.schema, ); break; case TSparkRowSetType.URL_BASED_SET: resultSource = new ArrowResultConverter( this.context, - new CloudFetchResultHandler(this.context, this._data), + new CloudFetchResultHandler(this.context, this._data, metadata.lz4Compressed), metadata.schema, ); break; diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index aa23f7d1..8ccf4dea 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -184,6 +184,7 @@ export default class DBSQLSession implements IDBSQLSession { ...getArrowOptions(clientConfig), canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch, parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters), + canDecompressLZ4Result: clientConfig.useLZ4Compression, }); const response = await this.handleResponse(operationPromise); const operation = this.createOperation(response); diff --git a/lib/contracts/IClientContext.ts b/lib/contracts/IClientContext.ts index 062d0795..712b845f 100644 --- a/lib/contracts/IClientContext.ts +++ b/lib/contracts/IClientContext.ts @@ -15,6 +15,8 @@ export interface ClientConfig { useCloudFetch: boolean; cloudFetchConcurrentDownloads: number; + + useLZ4Compression: boolean; } export default interface IClientContext { diff --git a/lib/result/ArrowResultHandler.ts b/lib/result/ArrowResultHandler.ts index 6978a7d6..52e2aafa 100644 --- a/lib/result/ArrowResultHandler.ts +++ b/lib/result/ArrowResultHandler.ts @@ -1,4 +1,4 @@ -import { Buffer } from 'buffer'; +import LZ4 from 'lz4'; import { TRowSet } from '../../thrift/TCLIService_types'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; @@ -10,10 +10,18 @@ export default class ArrowResultHandler implements IResultsProvider, arrowSchema?: Buffer) { + private readonly isLZ4Compressed: boolean; + + constructor( + context: IClientContext, + source: IResultsProvider, + arrowSchema?: Buffer, + isLZ4Compressed?: boolean, + ) { this.context = context; this.source = source; this.arrowSchema = arrowSchema; + this.isLZ4Compressed = isLZ4Compressed ?? false; } public async hasMore() { @@ -31,9 +39,9 @@ export default class ArrowResultHandler implements IResultsProvider = []; - rowSet?.arrowBatches?.forEach((arrowBatch) => { - if (arrowBatch.batch) { - batches.push(arrowBatch.batch); + rowSet?.arrowBatches?.forEach(({ batch }) => { + if (batch) { + batches.push(this.isLZ4Compressed ? LZ4.decode(batch) : batch); } }); diff --git a/lib/result/CloudFetchResultHandler.ts b/lib/result/CloudFetchResultHandler.ts index f7b3f4cd..f6b40164 100644 --- a/lib/result/CloudFetchResultHandler.ts +++ b/lib/result/CloudFetchResultHandler.ts @@ -1,4 +1,4 @@ -import { Buffer } from 'buffer'; +import LZ4 from 'lz4'; import fetch, { RequestInfo, RequestInit } from 'node-fetch'; import { TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types'; import IClientContext from '../contracts/IClientContext'; @@ -9,13 +9,16 @@ export default class CloudFetchResultHandler implements IResultsProvider; + private readonly isLZ4Compressed: boolean; + private pendingLinks: Array = []; private downloadTasks: Array> = []; - constructor(context: IClientContext, source: IResultsProvider) { + constructor(context: IClientContext, source: IResultsProvider, isLZ4Compressed?: boolean) { this.context = context; this.source = source; + this.isLZ4Compressed = isLZ4Compressed ?? false; } public async hasMore() { @@ -42,7 +45,12 @@ export default class CloudFetchResultHandler implements IResultsProvider LZ4.decode(buffer)); + } + return batches; } private async downloadLink(link: TSparkArrowResultLink): Promise { diff --git a/package-lock.json b/package-lock.json index f7c37b09..5aff2a58 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "apache-arrow": "^13.0.0", "commander": "^9.3.0", + "lz4": "^0.6.5", "node-fetch": "^2.6.12", "node-int64": "^0.4.0", "open": "^8.4.2", @@ -21,6 +22,7 @@ "winston": "^3.8.2" }, "devDependencies": { + "@types/lz4": "^0.6.4", "@types/node": "^18.11.9", "@types/node-fetch": "^2.6.4", "@types/node-int64": "^0.4.29", @@ -899,6 +901,15 @@ "integrity": "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ==", "dev": true }, + "node_modules/@types/lz4": { + "version": "0.6.4", + "resolved": "https://registry.npmjs.org/@types/lz4/-/lz4-0.6.4.tgz", + "integrity": "sha512-vvxMIkowyHbY6zkCantyYwAK83N5E04bzFZSPJICG1SDpaL89L7lObs9DhLuhJhXHCQMkjzN3LYoAH6LjfwMFQ==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/node": { "version": "18.11.18", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.11.18.tgz", @@ -1458,6 +1469,25 @@ "integrity": "sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==", "dev": true }, + "node_modules/base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, "node_modules/basic-ftp": { "version": "5.0.3", "resolved": "https://registry.npmjs.org/basic-ftp/-/basic-ftp-5.0.3.tgz", @@ -1536,6 +1566,29 @@ "node": "^6 || ^7 || ^8 || ^9 || ^10 || ^11 || ^12 || >=13.7" } }, + "node_modules/buffer": { + "version": "5.7.1", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", + "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "dependencies": { + "base64-js": "^1.3.1", + "ieee754": "^1.1.13" + } + }, "node_modules/caching-transform": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/caching-transform/-/caching-transform-4.0.0.tgz", @@ -1885,6 +1938,11 @@ "node": ">= 8" } }, + "node_modules/cuint": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/cuint/-/cuint-0.2.2.tgz", + "integrity": "sha512-d4ZVpCW31eWwCMe1YT3ur7mUDnTXbgwyzaL320DrcRT45rfjYxkt5QWLrmOJ+/UEAI2+fQgKe/fCjR8l4TpRgw==" + }, "node_modules/damerau-levenshtein": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/damerau-levenshtein/-/damerau-levenshtein-1.0.8.tgz", @@ -3260,6 +3318,25 @@ "node": ">= 14" } }, + "node_modules/ieee754": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, "node_modules/ignore": { "version": "5.2.0", "resolved": "https://registry.npmjs.org/ignore/-/ignore-5.2.0.tgz", @@ -3994,6 +4071,21 @@ "node": ">=10" } }, + "node_modules/lz4": { + "version": "0.6.5", + "resolved": "https://registry.npmjs.org/lz4/-/lz4-0.6.5.tgz", + "integrity": "sha512-KSZcJU49QZOlJSItaeIU3p8WoAvkTmD9fJqeahQXNu1iQ/kR0/mQLdbrK8JY9MY8f6AhJoMrihp1nu1xDbscSQ==", + "hasInstallScript": true, + "dependencies": { + "buffer": "^5.2.1", + "cuint": "^0.2.2", + "nan": "^2.13.2", + "xxhashjs": "^0.2.2" + }, + "engines": { + "node": ">= 0.10" + } + }, "node_modules/make-dir": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/make-dir/-/make-dir-3.1.0.tgz", @@ -4166,6 +4258,11 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, + "node_modules/nan": { + "version": "2.18.0", + "resolved": "https://registry.npmjs.org/nan/-/nan-2.18.0.tgz", + "integrity": "sha512-W7tfG7vMOGtD30sHoZSSc/JVYiyDPEyQVso/Zz+/uQd0B0L46gtC+pHha5FFMRpil6fm/AoEcRWyOVi4+E/f8w==" + }, "node_modules/nanoid": { "version": "3.3.3", "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.3.tgz", @@ -5994,6 +6091,14 @@ "async-limiter": "~1.0.0" } }, + "node_modules/xxhashjs": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/xxhashjs/-/xxhashjs-0.2.2.tgz", + "integrity": "sha512-AkTuIuVTET12tpsVIQo+ZU6f/qDmKuRUcjaqR+OIvm+aCBsZ95i7UVY5WJ9TMsSaZ0DA2WxoZ4acu0sPH+OKAw==", + "dependencies": { + "cuint": "^0.2.2" + } + }, "node_modules/y18n": { "version": "5.0.8", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz", @@ -6767,6 +6872,15 @@ "integrity": "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ==", "dev": true }, + "@types/lz4": { + "version": "0.6.4", + "resolved": "https://registry.npmjs.org/@types/lz4/-/lz4-0.6.4.tgz", + "integrity": "sha512-vvxMIkowyHbY6zkCantyYwAK83N5E04bzFZSPJICG1SDpaL89L7lObs9DhLuhJhXHCQMkjzN3LYoAH6LjfwMFQ==", + "dev": true, + "requires": { + "@types/node": "*" + } + }, "@types/node": { "version": "18.11.18", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.11.18.tgz", @@ -7166,6 +7280,11 @@ "integrity": "sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==", "dev": true }, + "base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==" + }, "basic-ftp": { "version": "5.0.3", "resolved": "https://registry.npmjs.org/basic-ftp/-/basic-ftp-5.0.3.tgz", @@ -7219,6 +7338,15 @@ "update-browserslist-db": "^1.0.5" } }, + "buffer": { + "version": "5.7.1", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", + "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", + "requires": { + "base64-js": "^1.3.1", + "ieee754": "^1.1.13" + } + }, "caching-transform": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/caching-transform/-/caching-transform-4.0.0.tgz", @@ -7488,6 +7616,11 @@ "which": "^2.0.1" } }, + "cuint": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/cuint/-/cuint-0.2.2.tgz", + "integrity": "sha512-d4ZVpCW31eWwCMe1YT3ur7mUDnTXbgwyzaL320DrcRT45rfjYxkt5QWLrmOJ+/UEAI2+fQgKe/fCjR8l4TpRgw==" + }, "damerau-levenshtein": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/damerau-levenshtein/-/damerau-levenshtein-1.0.8.tgz", @@ -8497,6 +8630,11 @@ "debug": "4" } }, + "ieee754": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==" + }, "ignore": { "version": "5.2.0", "resolved": "https://registry.npmjs.org/ignore/-/ignore-5.2.0.tgz", @@ -9038,6 +9176,17 @@ "yallist": "^4.0.0" } }, + "lz4": { + "version": "0.6.5", + "resolved": "https://registry.npmjs.org/lz4/-/lz4-0.6.5.tgz", + "integrity": "sha512-KSZcJU49QZOlJSItaeIU3p8WoAvkTmD9fJqeahQXNu1iQ/kR0/mQLdbrK8JY9MY8f6AhJoMrihp1nu1xDbscSQ==", + "requires": { + "buffer": "^5.2.1", + "cuint": "^0.2.2", + "nan": "^2.13.2", + "xxhashjs": "^0.2.2" + } + }, "make-dir": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/make-dir/-/make-dir-3.1.0.tgz", @@ -9170,6 +9319,11 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, + "nan": { + "version": "2.18.0", + "resolved": "https://registry.npmjs.org/nan/-/nan-2.18.0.tgz", + "integrity": "sha512-W7tfG7vMOGtD30sHoZSSc/JVYiyDPEyQVso/Zz+/uQd0B0L46gtC+pHha5FFMRpil6fm/AoEcRWyOVi4+E/f8w==" + }, "nanoid": { "version": "3.3.3", "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.3.tgz", @@ -10525,6 +10679,14 @@ "async-limiter": "~1.0.0" } }, + "xxhashjs": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/xxhashjs/-/xxhashjs-0.2.2.tgz", + "integrity": "sha512-AkTuIuVTET12tpsVIQo+ZU6f/qDmKuRUcjaqR+OIvm+aCBsZ95i7UVY5WJ9TMsSaZ0DA2WxoZ4acu0sPH+OKAw==", + "requires": { + "cuint": "^0.2.2" + } + }, "y18n": { "version": "5.0.8", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz", diff --git a/package.json b/package.json index ae568e8c..0ae4b5ed 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,7 @@ ], "license": "Apache 2.0", "devDependencies": { + "@types/lz4": "^0.6.4", "@types/node": "^18.11.9", "@types/node-fetch": "^2.6.4", "@types/node-int64": "^0.4.29", @@ -73,6 +74,7 @@ "dependencies": { "apache-arrow": "^13.0.0", "commander": "^9.3.0", + "lz4": "^0.6.5", "node-fetch": "^2.6.12", "node-int64": "^0.4.0", "open": "^8.4.2", diff --git a/tests/e2e/arrow.test.js b/tests/e2e/arrow.test.js index 5de7e93e..f5127f6c 100644 --- a/tests/e2e/arrow.test.js +++ b/tests/e2e/arrow.test.js @@ -90,6 +90,7 @@ describe('Arrow support', () => { }, { arrowEnabled: false, + useLZ4Compression: false, }, ), ); @@ -112,6 +113,7 @@ describe('Arrow support', () => { { arrowEnabled: true, useArrowNativeTypes: false, + useLZ4Compression: false, }, ), ); @@ -134,6 +136,7 @@ describe('Arrow support', () => { { arrowEnabled: true, useArrowNativeTypes: true, + useLZ4Compression: false, }, ), ); @@ -143,6 +146,7 @@ describe('Arrow support', () => { const session = await openSession({ arrowEnabled: true, + useLZ4Compression: false, }); const operation = await session.executeStatement(` SELECT * @@ -168,4 +172,28 @@ describe('Arrow support', () => { expect(result.length).to.be.eq(rowsCount); }); + + it( + 'should handle LZ4 compressed data', + createTest( + async (session) => { + const operation = await session.executeStatement(`SELECT * FROM ${tableName}`); + const result = await operation.fetchAll(); + expect(fixArrowResult(result)).to.deep.equal(expectedArrow); + + const resultHandler = await operation.getResultHandler(); + expect(resultHandler).to.be.instanceof(ResultSlicer); + expect(resultHandler.source).to.be.instanceof(ArrowResultConverter); + expect(resultHandler.source.source).to.be.instanceof(ArrowResultHandler); + expect(resultHandler.source.source.isLZ4Compressed).to.be.true; + + await operation.close(); + }, + { + arrowEnabled: true, + useArrowNativeTypes: false, + useLZ4Compression: true, + }, + ), + ); }); diff --git a/tests/e2e/cloudfetch.test.js b/tests/e2e/cloudfetch.test.js index 4dc41e43..a877f5af 100644 --- a/tests/e2e/cloudfetch.test.js +++ b/tests/e2e/cloudfetch.test.js @@ -32,7 +32,10 @@ async function openSession(customConfig) { describe('CloudFetch', () => { it('should fetch data', async () => { const cloudFetchConcurrentDownloads = 5; - const session = await openSession({ cloudFetchConcurrentDownloads }); + const session = await openSession({ + cloudFetchConcurrentDownloads, + useLZ4Compression: false, + }); const queriedRowsCount = 10000000; // result has to be quite big to enable CloudFetch const operation = await session.executeStatement( @@ -86,4 +89,38 @@ describe('CloudFetch', () => { expect(fetchedRowCount).to.be.equal(queriedRowsCount); }); + + it('should handle LZ4 compressed data', async () => { + const cloudFetchConcurrentDownloads = 5; + const session = await openSession({ + cloudFetchConcurrentDownloads, + useLZ4Compression: true, + }); + + const queriedRowsCount = 10000000; // result has to be quite big to enable CloudFetch + const operation = await session.executeStatement( + ` + SELECT * + FROM range(0, ${queriedRowsCount}) AS t1 + LEFT JOIN (SELECT 1) AS t2 + `, + { + maxRows: null, // disable DirectResults + useCloudFetch: true, // tell server that we would like to use CloudFetch + }, + ); + + // We're going to examine some internals of operation, so explicitly wait for completion + await operation.finished(); + + // Check if we're actually getting data via CloudFetch + const resultHandler = await operation.getResultHandler(); + expect(resultHandler).to.be.instanceof(ResultSlicer); + expect(resultHandler.source).to.be.instanceof(ArrowResultConverter); + expect(resultHandler.source.source).to.be.instanceOf(CloudFetchResultHandler); + expect(resultHandler.source.source.isLZ4Compressed).to.be.true; + + const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true }); + expect(chunk.length).to.be.gt(0); + }); }); diff --git a/tests/unit/result/ArrowResultHandler.test.js b/tests/unit/result/ArrowResultHandler.test.js index 9c24e680..eb70544b 100644 --- a/tests/unit/result/ArrowResultHandler.test.js +++ b/tests/unit/result/ArrowResultHandler.test.js @@ -1,6 +1,5 @@ const { expect } = require('chai'); -const fs = require('fs'); -const path = require('path'); +const LZ4 = require('lz4'); const ArrowResultHandler = require('../../../dist/result/ArrowResultHandler').default; const ResultsProviderMock = require('./fixtures/ResultsProviderMock'); @@ -30,6 +29,14 @@ const sampleRowSet1 = { arrowBatches: [sampleArrowBatch], }; +const sampleRowSet1LZ4Compressed = { + startRowOffset: 0, + arrowBatches: sampleRowSet1.arrowBatches.map((item) => ({ + ...item, + batch: LZ4.encode(item.batch), + })), +}; + const sampleRowSet2 = { startRowOffset: 0, arrowBatches: undefined, @@ -51,6 +58,32 @@ const sampleRowSet4 = { }; describe('ArrowResultHandler', () => { + it('should return data', async () => { + const context = {}; + const rowSetProvider = new ResultsProviderMock([sampleRowSet1]); + const result = new ArrowResultHandler(context, rowSetProvider, sampleArrowSchema); + + const batches = await result.fetchNext({ limit: 10000 }); + expect(await rowSetProvider.hasMore()).to.be.false; + expect(await result.hasMore()).to.be.false; + + const expectedBatches = sampleRowSet1.arrowBatches.map(({ batch }) => batch); + expect(batches).to.deep.eq([sampleArrowSchema, ...expectedBatches]); + }); + + it('should handle LZ4 compressed data', async () => { + const context = {}; + const rowSetProvider = new ResultsProviderMock([sampleRowSet1LZ4Compressed]); + const result = new ArrowResultHandler(context, rowSetProvider, sampleArrowSchema, true); + + const batches = await result.fetchNext({ limit: 10000 }); + expect(await rowSetProvider.hasMore()).to.be.false; + expect(await result.hasMore()).to.be.false; + + const expectedBatches = sampleRowSet1.arrowBatches.map(({ batch }) => batch); + expect(batches).to.deep.eq([sampleArrowSchema, ...expectedBatches]); + }); + it('should not buffer any data', async () => { const context = {}; const rowSetProvider = new ResultsProviderMock([sampleRowSet1]); diff --git a/tests/unit/result/CloudFetchResultHandler.test.js b/tests/unit/result/CloudFetchResultHandler.test.js index bbe9638f..44b8475e 100644 --- a/tests/unit/result/CloudFetchResultHandler.test.js +++ b/tests/unit/result/CloudFetchResultHandler.test.js @@ -1,6 +1,7 @@ const { expect, AssertionError } = require('chai'); const sinon = require('sinon'); const Int64 = require('node-int64'); +const LZ4 = require('lz4'); const CloudFetchResultHandler = require('../../../dist/result/CloudFetchResultHandler').default; const ResultsProviderMock = require('./fixtures/ResultsProviderMock'); const DBSQLClient = require('../../../dist/DBSQLClient').default; @@ -204,6 +205,36 @@ describe('CloudFetchResultHandler', () => { } }); + it('should handle LZ4 compressed data', async () => { + const clientConfig = DBSQLClient.getDefaultConfig(); + + const rowSetProvider = new ResultsProviderMock([sampleRowSet1]); + const context = { + getConfig: () => clientConfig, + }; + + const result = new CloudFetchResultHandler(context, rowSetProvider, true); + + const expectedBatch = Buffer.concat([sampleArrowSchema, sampleArrowBatch]); + + sinon.stub(result, 'fetch').returns( + Promise.resolve({ + ok: true, + status: 200, + statusText: 'OK', + arrayBuffer: async () => LZ4.encode(expectedBatch), + }), + ); + + expect(await rowSetProvider.hasMore()).to.be.true; + + const items = await result.fetchNext({ limit: 10000 }); + expect(await rowSetProvider.hasMore()).to.be.false; + + expect(result.fetch.called).to.be.true; + expect(items).to.deep.eq([expectedBatch]); + }); + it('should handle HTTP errors', async () => { const clientConfig = DBSQLClient.getDefaultConfig(); clientConfig.cloudFetchConcurrentDownloads = 1;