Skip to content

Commit 072b7d6

Browse files
julienledemwesm
authored andcommitted
ARROW-395: Arrow file format writes record batches in reverse order.
Author: Julien Le Dem <julien@dremio.com> Closes #220 from julienledem/rb_order and squashes the following commits: ae5b7f8 [Julien Le Dem] ARROW-395: Arrow file format writes record batches in reverse order.
1 parent 859018b commit 072b7d6

File tree

2 files changed

+23
-19
lines changed

2 files changed

+23
-19
lines changed

java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.arrow.vector.file;
1919

20+
import static org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector;
21+
2022
import java.util.ArrayList;
2123
import java.util.List;
2224

@@ -52,10 +54,10 @@ public ArrowFooter(Footer footer) {
5254

5355
private static List<ArrowBlock> recordBatches(Footer footer) {
5456
List<ArrowBlock> recordBatches = new ArrayList<>();
55-
Block tempBLock = new Block();
57+
Block tempBlock = new Block();
5658
int recordBatchesLength = footer.recordBatchesLength();
5759
for (int i = 0; i < recordBatchesLength; i++) {
58-
Block block = footer.recordBatches(tempBLock, i);
60+
Block block = footer.recordBatches(tempBlock, i);
5961
recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
6062
}
6163
return recordBatches;
@@ -88,23 +90,16 @@ public List<ArrowBlock> getRecordBatches() {
8890
public int writeTo(FlatBufferBuilder builder) {
8991
int schemaIndex = schema.getSchema(builder);
9092
Footer.startDictionariesVector(builder, dictionaries.size());
91-
int dicsOffset = endVector(builder, dictionaries);
93+
int dicsOffset = writeAllStructsToVector(builder, dictionaries);
9294
Footer.startRecordBatchesVector(builder, recordBatches.size());
93-
int rbsOffset = endVector(builder, recordBatches);
95+
int rbsOffset = writeAllStructsToVector(builder, recordBatches);
9496
Footer.startFooter(builder);
9597
Footer.addSchema(builder, schemaIndex);
9698
Footer.addDictionaries(builder, dicsOffset);
9799
Footer.addRecordBatches(builder, rbsOffset);
98100
return Footer.endFooter(builder);
99101
}
100102

101-
private int endVector(FlatBufferBuilder builder, List<ArrowBlock> blocks) {
102-
for (ArrowBlock block : blocks) {
103-
block.writeTo(builder);
104-
}
105-
return builder.endVector();
106-
}
107-
108103
@Override
109104
public int hashCode() {
110105
final int prime = 31;

java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,24 +161,27 @@ public void testWriteReadComplex() throws IOException {
161161
@Test
162162
public void testWriteReadMultipleRBs() throws IOException {
163163
File file = new File("target/mytest_multiple.arrow");
164-
int count = COUNT;
164+
int[] counts = { 10, 5 };
165165

166166
// write
167167
try (
168168
BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
169169
MapVector parent = new MapVector("parent", originalVectorAllocator, null);
170170
FileOutputStream fileOutputStream = new FileOutputStream(file);) {
171-
writeData(count, parent);
172-
VectorUnloader vectorUnloader = newVectorUnloader(parent.getChild("root"));
173-
Schema schema = vectorUnloader.getSchema();
171+
writeData(counts[0], parent);
172+
VectorUnloader vectorUnloader0 = newVectorUnloader(parent.getChild("root"));
173+
Schema schema = vectorUnloader0.getSchema();
174174
Assert.assertEquals(2, schema.getFields().size());
175175
try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);) {
176-
try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch()) {
176+
try (ArrowRecordBatch recordBatch = vectorUnloader0.getRecordBatch()) {
177+
Assert.assertEquals("RB #0", counts[0], recordBatch.getLength());
177178
arrowWriter.writeRecordBatch(recordBatch);
178179
}
179180
parent.allocateNew();
180-
writeData(count, parent);
181-
try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch()) {
181+
writeData(counts[1], parent); // if we write the same data we don't catch that the metadata is stored in the wrong order.
182+
VectorUnloader vectorUnloader1 = newVectorUnloader(parent.getChild("root"));
183+
try (ArrowRecordBatch recordBatch = vectorUnloader1.getRecordBatch()) {
184+
Assert.assertEquals("RB #1", counts[1], recordBatch.getLength());
182185
arrowWriter.writeRecordBatch(recordBatch);
183186
}
184187
}
@@ -195,21 +198,27 @@ public void testWriteReadMultipleRBs() throws IOException {
195198
ArrowFooter footer = arrowReader.readFooter();
196199
Schema schema = footer.getSchema();
197200
LOGGER.debug("reading schema: " + schema);
201+
int i = 0;
198202
try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
199203
VectorLoader vectorLoader = new VectorLoader(root);
200204
List<ArrowBlock> recordBatches = footer.getRecordBatches();
201205
Assert.assertEquals(2, recordBatches.size());
206+
long previousOffset = 0;
202207
for (ArrowBlock rbBlock : recordBatches) {
208+
Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset);
209+
previousOffset = rbBlock.getOffset();
203210
Assert.assertEquals(0, rbBlock.getOffset() % 8);
204211
Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
205212
try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
213+
Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength());
206214
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
207215
for (ArrowBuffer arrowBuffer : buffersLayout) {
208216
Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
209217
}
210218
vectorLoader.load(recordBatch);
211-
validateContent(count, root);
219+
validateContent(counts[i], root);
212220
}
221+
++i;
213222
}
214223
}
215224
}

0 commit comments

Comments
 (0)