Skip to content

Commit

Permalink
[PECO-1260] Support results compression (#216)
Browse files Browse the repository at this point in the history
* [PECO-1260] Support results compression

Signed-off-by: Levko Kravets <levko.ne@gmail.com>

* Fix existing tests

Signed-off-by: Levko Kravets <levko.ne@gmail.com>

* Add tests

Signed-off-by: Levko Kravets <levko.ne@gmail.com>

* Rename option to align with other connectors

Signed-off-by: Levko Kravets <levko.ne@gmail.com>

---------

Signed-off-by: Levko Kravets <levko.ne@gmail.com>
  • Loading branch information
kravets-levko committed Jan 30, 2024
1 parent 5b01d59 commit f3c53a5
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 13 deletions.
2 changes: 2 additions & 0 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

useCloudFetch: false,
cloudFetchConcurrentDownloads: 10,

useLZ4Compression: true,
};
}

Expand Down
4 changes: 2 additions & 2 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,14 @@ export default class DBSQLOperation implements IOperation {
case TSparkRowSetType.ARROW_BASED_SET:
resultSource = new ArrowResultConverter(
this.context,
new ArrowResultHandler(this.context, this._data, metadata.arrowSchema),
new ArrowResultHandler(this.context, this._data, metadata.arrowSchema, metadata.lz4Compressed),
metadata.schema,
);
break;
case TSparkRowSetType.URL_BASED_SET:
resultSource = new ArrowResultConverter(
this.context,
new CloudFetchResultHandler(this.context, this._data),
new CloudFetchResultHandler(this.context, this._data, metadata.lz4Compressed),
metadata.schema,
);
break;
Expand Down
1 change: 1 addition & 0 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export default class DBSQLSession implements IDBSQLSession {
...getArrowOptions(clientConfig),
canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch,
parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters),
canDecompressLZ4Result: clientConfig.useLZ4Compression,
});
const response = await this.handleResponse(operationPromise);
const operation = this.createOperation(response);
Expand Down
2 changes: 2 additions & 0 deletions lib/contracts/IClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export interface ClientConfig {

useCloudFetch: boolean;
cloudFetchConcurrentDownloads: number;

useLZ4Compression: boolean;
}

export default interface IClientContext {
Expand Down
18 changes: 13 additions & 5 deletions lib/result/ArrowResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Buffer } from 'buffer';
import LZ4 from 'lz4';
import { TRowSet } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
Expand All @@ -10,10 +10,18 @@ export default class ArrowResultHandler implements IResultsProvider<Array<Buffer

private readonly arrowSchema?: Buffer;

constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, arrowSchema?: Buffer) {
private readonly isLZ4Compressed: boolean;

constructor(
context: IClientContext,
source: IResultsProvider<TRowSet | undefined>,
arrowSchema?: Buffer,
isLZ4Compressed?: boolean,
) {
this.context = context;
this.source = source;
this.arrowSchema = arrowSchema;
this.isLZ4Compressed = isLZ4Compressed ?? false;
}

public async hasMore() {
Expand All @@ -31,9 +39,9 @@ export default class ArrowResultHandler implements IResultsProvider<Array<Buffer
const rowSet = await this.source.fetchNext(options);

const batches: Array<Buffer> = [];
rowSet?.arrowBatches?.forEach((arrowBatch) => {
if (arrowBatch.batch) {
batches.push(arrowBatch.batch);
rowSet?.arrowBatches?.forEach(({ batch }) => {
if (batch) {
batches.push(this.isLZ4Compressed ? LZ4.decode(batch) : batch);
}
});

Expand Down
14 changes: 11 additions & 3 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Buffer } from 'buffer';
import LZ4 from 'lz4';
import fetch, { RequestInfo, RequestInit } from 'node-fetch';
import { TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
Expand All @@ -9,13 +9,16 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B

private readonly source: IResultsProvider<TRowSet | undefined>;

private readonly isLZ4Compressed: boolean;

private pendingLinks: Array<TSparkArrowResultLink> = [];

private downloadTasks: Array<Promise<Buffer>> = [];

constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>) {
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, isLZ4Compressed?: boolean) {
this.context = context;
this.source = source;
this.isLZ4Compressed = isLZ4Compressed ?? false;
}

public async hasMore() {
Expand All @@ -42,7 +45,12 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
}

const batch = await this.downloadTasks.shift();
return batch ? [batch] : [];
const batches = batch ? [batch] : [];

if (this.isLZ4Compressed) {
return batches.map((buffer) => LZ4.decode(buffer));
}
return batches;
}

private async downloadLink(link: TSparkArrowResultLink): Promise<Buffer> {
Expand Down
Loading

0 comments on commit f3c53a5

Please sign in to comment.