Skip to content

Commit 85d0e00

Browse files
committed
fix uniform chunk distribution when the new chunks are longer than the current chunks
1 parent 8218f40 commit 85d0e00

File tree

1 file changed

+24
-14
lines changed

1 file changed

+24
-14
lines changed

js/src/util/recordbatch.ts

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,31 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
import { Data } from '../data';
1918
import { Column } from '../column';
2019
import { Schema } from '../schema';
2120
import { Vector } from '../vector';
2221
import { DataType } from '../type';
22+
import { Data, Buffers } from '../data';
2323
import { Chunked } from '../vector/chunked';
2424
import { RecordBatch } from '../recordbatch';
2525

26+
const noopBuf = new Uint8Array(0);
27+
const nullBufs = (bitmapLength: number) => <unknown> [
28+
noopBuf, noopBuf, new Uint8Array(bitmapLength), noopBuf
29+
] as Buffers<any>;
30+
2631
/** @ignore */
2732
export function alignChunkLengths<T extends { [key: string]: DataType; } = any>(schema: Schema, chunks: Data<T[keyof T]>[], length = chunks.reduce((l, c) => Math.max(l, c.length), 0)) {
2833
const bitmapLength = ((length + 63) & ~63) >> 3;
2934
return chunks.map((chunk, idx) => {
3035
const chunkLength = chunk ? chunk.length : 0;
3136
if (chunkLength === length) { return chunk; }
3237
const field = schema.fields[idx];
33-
if (!field.nullable) { schema.fields[idx] = field.clone({ nullable: true }); }
34-
return chunk
35-
? chunk._changeLengthAndBackfillNullBitmap(length)
36-
: new Data(field.type, 0, length, length, [,, new Uint8Array(bitmapLength)]);
38+
if (!field.nullable) {
39+
schema.fields[idx] = field.clone({ nullable: true });
40+
}
41+
return chunk ? chunk._changeLengthAndBackfillNullBitmap(length)
42+
: new Data(field.type, 0, length, length, nullBufs(bitmapLength));
3743
});
3844
}
3945

@@ -57,10 +63,11 @@ export function uniformlyDistributeChunksAcrossRecordBatches<T extends { [key: s
5763
for (let chunkIndex = -1; ++chunkIndex < memo.numChunks;) {
5864

5965
const [sameLength, batchLength] = chunks.reduce((memo, chunks) => {
66+
const [same, batchLength] = memo;
6067
const chunk = chunks[chunkIndex];
61-
const [sameLength, batchLength] = memo;
62-
memo[1] = Math.min(batchLength, chunk ? chunk.length : batchLength);
63-
sameLength && isFinite(batchLength) && (memo[0] = batchLength === memo[1]);
68+
const chunkLength = chunk ? chunk.length : batchLength;
69+
isFinite(batchLength) && same && (memo[0] = chunkLength === batchLength);
70+
memo[1] = Math.min(batchLength, chunkLength);
6471
return memo;
6572
}, [true, Number.POSITIVE_INFINITY] as [boolean, number]);
6673

@@ -80,8 +87,10 @@ function gatherChunksSameLength(schema: Schema, chunkIndex: number, length: numb
8087
const chunk = chunks[chunkIndex];
8188
if (chunk) { return chunk; }
8289
const field = schema.fields[idx];
83-
if (!field.nullable) { schema.fields[idx] = field.clone({ nullable: true }); }
84-
return new Data(field.type, 0, length, length, [,, new Uint8Array(bitmapLength)]);
90+
if (!field.nullable) {
91+
schema.fields[idx] = field.clone({ nullable: true });
92+
}
93+
return new Data(field.type, 0, length, length, nullBufs(bitmapLength));
8594
});
8695
}
8796

@@ -98,9 +107,10 @@ function gatherChunksDiffLength(schema: Schema, chunkIndex: number, length: numb
98107
return chunk.slice(0, length);
99108
}
100109
const field = schema.fields[idx];
101-
if (!field.nullable) { schema.fields[idx] = field.clone({ nullable: true }); }
102-
return chunk
103-
? chunk._changeLengthAndBackfillNullBitmap(length)
104-
: new Data(field.type, 0, length, length, [,, new Uint8Array(bitmapLength)]);
110+
if (!field.nullable) {
111+
schema.fields[idx] = field.clone({ nullable: true });
112+
}
113+
return chunk ? chunk._changeLengthAndBackfillNullBitmap(length)
114+
: new Data(field.type, 0, length, length, nullBufs(bitmapLength));
105115
});
106116
}

0 commit comments

Comments
 (0)