Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-1532] Ignore the excess records in query results #239

Merged
merged 5 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 78 additions & 37 deletions lib/result/ArrowResultConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
import { TGetResultSetMetadataResp, TColumnDesc } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { getSchemaColumns, convertThriftValue } from './utils';
import { ArrowBatch, getSchemaColumns, convertThriftValue } from './utils';

const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;

Expand All @@ -26,15 +26,23 @@ type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
export default class ArrowResultConverter implements IResultsProvider<Array<any>> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<Array<Buffer>>;
private readonly source: IResultsProvider<ArrowBatch>;

private readonly schema: Array<TColumnDesc>;

private reader?: IterableIterator<RecordBatch<TypeMap>>;
private recordBatchReader?: IterableIterator<RecordBatch<TypeMap>>;

private pendingRecordBatch?: RecordBatch<TypeMap>;
// Remaining rows in current Arrow batch (not the record batch!)
private remainingRows: number = 0;

constructor(context: IClientContext, source: IResultsProvider<Array<Buffer>>, { schema }: TGetResultSetMetadataResp) {
// This is the next (!!) record batch to be read. It is unset only in two cases:
// - prior to the first call to `fetchNext`
// - when no more data available
// This field is primarily used by a `hasMore`, so it can tell if next `fetchNext` will
// actually return a non-empty result
private prefetchedRecordBatch?: RecordBatch<TypeMap>;

constructor(context: IClientContext, source: IResultsProvider<ArrowBatch>, { schema }: TGetResultSetMetadataResp) {
this.context = context;
this.source = source;
this.schema = getSchemaColumns(schema);
Expand All @@ -44,7 +52,7 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
if (this.schema.length === 0) {
return false;
}
if (this.pendingRecordBatch) {
if (this.prefetchedRecordBatch) {
return true;
}
return this.source.hasMore();
Expand All @@ -55,47 +63,80 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
return [];
}

// eslint-disable-next-line no-constant-condition
while (true) {
// It's not possible to know if iterator has more items until trying
// to get the next item. But we need to know if iterator is empty right
// after getting the next item. Therefore, after creating the iterator,
// we get one item more and store it in `pendingRecordBatch`. Next time,
// we use that stored item, and prefetch the next one. Prefetched item
// is therefore the next item we are going to return, so it can be used
// to know if we actually can return anything next time
const recordBatch = this.pendingRecordBatch;
this.pendingRecordBatch = this.prefetch();

if (recordBatch) {
const table = new Table(recordBatch);
return this.getRows(table.schema, table.toArray());
// It's not possible to know if iterator has more items until trying to get the next item.
// So each time we read one batch ahead and store it, but process the batch prefetched on
// a previous `fetchNext` call. Because we actually already have the next item - it's easy
// to tell if the subsequent `fetchNext` will be able to read anything, and `hasMore` logic
// becomes trivial

// This prefetch handles a first call to `fetchNext`, when all the internal fields are not initialized yet.
// On subsequent calls to `fetchNext` it will do nothing
await this.prefetch(options);

if (this.prefetchedRecordBatch) {
// Consume a record batch fetched during previous call to `fetchNext`
const table = new Table(this.prefetchedRecordBatch);
this.prefetchedRecordBatch = undefined;
// Get table rows, but not more than remaining count
const arrowRows = table.toArray().slice(0, this.remainingRows);
const result = this.getRows(table.schema, arrowRows);

// Reduce remaining rows count by a count of rows we just processed.
// If the remaining count reached zero - we're done with current arrow
// batch, so discard the batch reader
this.remainingRows -= result.length;
if (this.remainingRows === 0) {
this.recordBatchReader = undefined;
}

// eslint-disable-next-line no-await-in-loop
const batches = await this.source.fetchNext(options);
if (batches.length === 0) {
this.reader = undefined;
break;
}
// Prefetch the next record batch
await this.prefetch(options);

const reader = RecordBatchReader.from<TypeMap>(batches);
this.reader = reader[Symbol.iterator]();
this.pendingRecordBatch = this.prefetch();
return result;
}

return [];
}

private prefetch(): RecordBatch<TypeMap> | undefined {
const item = this.reader?.next() ?? { done: true, value: undefined };
// This method tries to read one more record batch and store it in `prefetchedRecordBatch` field.
// If `prefetchedRecordBatch` is already non-empty - the method does nothing.
// This method pulls the next item from source if needed, initializes a record batch reader and
// gets the next item from it - until either reaches end of data or finds a non-empty record batch
private async prefetch(options: ResultsProviderFetchNextOptions) {
// This loop will be executed until a next non-empty record batch is retrieved
// Another implicit loop condition (end of data) is checked in the loop body
while (!this.prefetchedRecordBatch) {
// First, try to fetch next item from source and initialize record batch reader.
// If source has no more data - exit prematurely
if (!this.recordBatchReader) {
const sourceHasMore = await this.source.hasMore(); // eslint-disable-line no-await-in-loop
if (!sourceHasMore) {
return;
}

const arrowBatch = await this.source.fetchNext(options); // eslint-disable-line no-await-in-loop
if (arrowBatch.batches.length > 0 && arrowBatch.rowCount > 0) {
const reader = RecordBatchReader.from<TypeMap>(arrowBatch.batches);
this.recordBatchReader = reader[Symbol.iterator]();
this.remainingRows = arrowBatch.rowCount;
}
}

if (item.done || item.value === undefined) {
this.reader = undefined;
return undefined;
// Try to get a next item from current record batch reader. The reader may be unavailable at this point -
// in this case we fall back to a "done" state, and the `while` loop will do one more iteration attempting
// to create a new reader. Eventually it will either succeed or reach end of source. This scenario also
// handles readers which are already empty
const item = this.recordBatchReader?.next() ?? { done: true, value: undefined };
if (item.done || item.value === undefined) {
this.recordBatchReader = undefined;
} else {
// Skip empty batches
// eslint-disable-next-line no-lonely-if
if (item.value.numRows > 0) {
this.prefetchedRecordBatch = item.value;
}
}
}

return item.value;
}

private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
Expand Down
23 changes: 17 additions & 6 deletions lib/result/ArrowResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import LZ4 from 'lz4';
import { TGetResultSetMetadataResp, TRowSet } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { hiveSchemaToArrowSchema } from './utils';
import { ArrowBatch, hiveSchemaToArrowSchema } from './utils';

export default class ArrowResultHandler implements IResultsProvider<Array<Buffer>> {
export default class ArrowResultHandler implements IResultsProvider<ArrowBatch> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<TRowSet | undefined>;
Expand Down Expand Up @@ -35,22 +35,33 @@ export default class ArrowResultHandler implements IResultsProvider<Array<Buffer

public async fetchNext(options: ResultsProviderFetchNextOptions) {
if (!this.arrowSchema) {
return [];
return {
batches: [],
rowCount: 0,
};
}

const rowSet = await this.source.fetchNext(options);

const batches: Array<Buffer> = [];
rowSet?.arrowBatches?.forEach(({ batch }) => {
let totalRowCount = 0;
rowSet?.arrowBatches?.forEach(({ batch, rowCount }) => {
if (batch) {
batches.push(this.isLZ4Compressed ? LZ4.decode(batch) : batch);
totalRowCount += rowCount.toNumber(true);
}
});

if (batches.length === 0) {
return [];
return {
batches: [],
rowCount: 0,
};
}

return [this.arrowSchema, ...batches];
return {
batches: [this.arrowSchema, ...batches],
rowCount: totalRowCount,
};
}
}
23 changes: 16 additions & 7 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch';
import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { ArrowBatch } from './utils';

export default class CloudFetchResultHandler implements IResultsProvider<Array<Buffer>> {
export default class CloudFetchResultHandler implements IResultsProvider<ArrowBatch> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<TRowSet | undefined>;
Expand All @@ -13,7 +14,7 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B

private pendingLinks: Array<TSparkArrowResultLink> = [];

private downloadTasks: Array<Promise<Buffer>> = [];
private downloadTasks: Array<Promise<ArrowBatch>> = [];

constructor(
context: IClientContext,
Expand Down Expand Up @@ -49,15 +50,20 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
}

const batch = await this.downloadTasks.shift();
const batches = batch ? [batch] : [];
if (!batch) {
return {
batches: [],
rowCount: 0,
};
}

if (this.isLZ4Compressed) {
return batches.map((buffer) => LZ4.decode(buffer));
batch.batches = batch.batches.map((buffer) => LZ4.decode(buffer));
}
return batches;
return batch;
}

private async downloadLink(link: TSparkArrowResultLink): Promise<Buffer> {
private async downloadLink(link: TSparkArrowResultLink): Promise<ArrowBatch> {
if (Date.now() >= link.expiryTime.toNumber()) {
throw new Error('CloudFetch link has expired');
}
Expand All @@ -68,7 +74,10 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
}

const result = await response.arrayBuffer();
return Buffer.from(result);
return {
batches: [Buffer.from(result)],
rowCount: link.rowCount.toNumber(true),
};
}

private async fetch(url: RequestInfo, init?: RequestInit) {
Expand Down
5 changes: 5 additions & 0 deletions lib/result/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import {
import { TTableSchema, TColumnDesc, TPrimitiveTypeEntry, TTypeId } from '../../thrift/TCLIService_types';
import HiveDriverError from '../errors/HiveDriverError';

export interface ArrowBatch {
batches: Array<Buffer>;
rowCount: number;
}

export function getSchemaColumns(schema?: TTableSchema): Array<TColumnDesc> {
if (!schema) {
return [];
Expand Down
Loading
Loading