Skip to content

Commit 6996c17

Browse files
julienledemwesm
authored andcommitted
ARROW-312: [Java] IPC file round trip tool for integration testing
Author: Julien Le Dem <julien@dremio.com> Author: Wes McKinney <wes.mckinney@twosigma.com> Closes #186 from wesm/roundtrip-tool and squashes the following commits: aee552a [Julien Le Dem] missing file 9d5c078 [Julien Le Dem] fix read-write bug 7f20b36 [Julien Le Dem] simple roundtrip a04091f [Wes McKinney] Drafting file round trip helper executable
1 parent 79344b3 commit 6996c17

File tree

15 files changed

+681
-131
lines changed

15 files changed

+681
-131
lines changed

java/memory/src/main/java/io/netty/buffer/ArrowBuf.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,10 @@ public ArrowBuf retain(BufferAllocator target) {
179179
historicalLog.recordEvent("retain(%s)", target.getName());
180180
}
181181
final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
182-
return otherLedger.newArrowBuf(offset, length, null);
182+
ArrowBuf newArrowBuf = otherLedger.newArrowBuf(offset, length, null);
183+
newArrowBuf.readerIndex(this.readerIndex);
184+
newArrowBuf.writerIndex(this.writerIndex);
185+
return newArrowBuf;
183186
}
184187

185188
/**
@@ -214,6 +217,8 @@ public TransferResult transferOwnership(BufferAllocator target) {
214217

215218
final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
216219
final ArrowBuf newBuf = otherLedger.newArrowBuf(offset, length, null);
220+
newBuf.readerIndex(this.readerIndex);
221+
newBuf.writerIndex(this.writerIndex);
217222
final boolean allocationFit = this.ledger.transferBalance(otherLedger);
218223
return new TransferResult(allocationFit, newBuf);
219224
}

java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,13 @@
2222
import static org.junit.Assert.assertNotNull;
2323
import static org.junit.Assert.assertTrue;
2424
import static org.junit.Assert.fail;
25-
import io.netty.buffer.ArrowBuf;
26-
import io.netty.buffer.ArrowBuf.TransferResult;
2725

28-
import org.apache.arrow.memory.AllocationReservation;
29-
import org.apache.arrow.memory.BufferAllocator;
30-
import org.apache.arrow.memory.OutOfMemoryException;
31-
import org.apache.arrow.memory.RootAllocator;
3226
import org.junit.Ignore;
3327
import org.junit.Test;
3428

29+
import io.netty.buffer.ArrowBuf;
30+
import io.netty.buffer.ArrowBuf.TransferResult;
31+
3532
public class TestBaseAllocator {
3633
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class);
3734

@@ -134,6 +131,7 @@ public void testAllocator_transferOwnership() throws Exception {
134131
final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
135132
rootAllocator.verify();
136133
TransferResult transferOwnership = arrowBuf1.transferOwnership(childAllocator2);
134+
assertEquiv(arrowBuf1, transferOwnership.buffer);
137135
final boolean allocationFit = transferOwnership.allocationFit;
138136
rootAllocator.verify();
139137
assertTrue(allocationFit);
@@ -160,6 +158,7 @@ public void testAllocator_shareOwnership() throws Exception {
160158
rootAllocator.verify();
161159
assertNotNull(arrowBuf2);
162160
assertNotEquals(arrowBuf2, arrowBuf1);
161+
assertEquiv(arrowBuf1, arrowBuf2);
163162

164163
// release original buffer (thus transferring ownership to allocator 2. (should leave allocator 1 in empty state)
165164
arrowBuf1.release();
@@ -172,6 +171,7 @@ public void testAllocator_shareOwnership() throws Exception {
172171
assertNotNull(arrowBuf3);
173172
assertNotEquals(arrowBuf3, arrowBuf1);
174173
assertNotEquals(arrowBuf3, arrowBuf2);
174+
assertEquiv(arrowBuf1, arrowBuf3);
175175
rootAllocator.verify();
176176

177177
arrowBuf2.release();
@@ -452,8 +452,10 @@ public void testAllocator_transferSliced() throws Exception {
452452
rootAllocator.verify();
453453

454454
TransferResult result1 = arrowBuf2s.transferOwnership(childAllocator1);
455+
assertEquiv(arrowBuf2s, result1.buffer);
455456
rootAllocator.verify();
456457
TransferResult result2 = arrowBuf1s.transferOwnership(childAllocator2);
458+
assertEquiv(arrowBuf1s, result2.buffer);
457459
rootAllocator.verify();
458460

459461
result1.buffer.release();
@@ -482,7 +484,9 @@ public void testAllocator_shareSliced() throws Exception {
482484
rootAllocator.verify();
483485

484486
final ArrowBuf arrowBuf2s1 = arrowBuf2s.retain(childAllocator1);
487+
assertEquiv(arrowBuf2s, arrowBuf2s1);
485488
final ArrowBuf arrowBuf1s2 = arrowBuf1s.retain(childAllocator2);
489+
assertEquiv(arrowBuf1s, arrowBuf1s2);
486490
rootAllocator.verify();
487491

488492
arrowBuf1s.release(); // releases arrowBuf1
@@ -512,11 +516,13 @@ public void testAllocator_transferShared() throws Exception {
512516
rootAllocator.verify();
513517
assertNotNull(arrowBuf2);
514518
assertNotEquals(arrowBuf2, arrowBuf1);
519+
assertEquiv(arrowBuf1, arrowBuf2);
515520

516521
TransferResult result = arrowBuf1.transferOwnership(childAllocator3);
517522
allocationFit = result.allocationFit;
518523
final ArrowBuf arrowBuf3 = result.buffer;
519524
assertTrue(allocationFit);
525+
assertEquiv(arrowBuf1, arrowBuf3);
520526
rootAllocator.verify();
521527

522528
// Since childAllocator3 now has childAllocator1's buffer, 1, can close
@@ -533,6 +539,7 @@ public void testAllocator_transferShared() throws Exception {
533539
allocationFit = result.allocationFit;
534540
final ArrowBuf arrowBuf4 = result2.buffer;
535541
assertTrue(allocationFit);
542+
assertEquiv(arrowBuf3, arrowBuf4);
536543
rootAllocator.verify();
537544

538545
arrowBuf3.release();
@@ -645,4 +652,9 @@ public void multiple() throws Exception {
645652

646653
}
647654
}
655+
656+
public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) {
657+
assertEquals(origBuf.readerIndex(), newBuf.readerIndex());
658+
assertEquals(origBuf.writerIndex(), newBuf.writerIndex());
659+
}
648660
}

java/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,5 +467,6 @@
467467
<module>format</module>
468468
<module>memory</module>
469469
<module>vector</module>
470+
<module>tools</module>
470471
</modules>
471472
</project>

java/tools/pom.xml

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?xml version="1.0"?>
2+
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
3+
license agreements. See the NOTICE file distributed with this work for additional
4+
information regarding copyright ownership. The ASF licenses this file to
5+
You under the Apache License, Version 2.0 (the "License"); you may not use
6+
this file except in compliance with the License. You may obtain a copy of
7+
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
8+
by applicable law or agreed to in writing, software distributed under the
9+
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
10+
OF ANY KIND, either express or implied. See the License for the specific
11+
language governing permissions and limitations under the License. -->
12+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
13+
<modelVersion>4.0.0</modelVersion>
14+
<parent>
15+
<groupId>org.apache.arrow</groupId>
16+
<artifactId>arrow-java-root</artifactId>
17+
<version>0.1.1-SNAPSHOT</version>
18+
</parent>
19+
<artifactId>arrow-tools</artifactId>
20+
<name>Arrow Tools</name>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.apache.arrow</groupId>
25+
<artifactId>arrow-format</artifactId>
26+
<version>${project.version}</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.arrow</groupId>
30+
<artifactId>arrow-memory</artifactId>
31+
<version>${project.version}</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.arrow</groupId>
35+
<artifactId>arrow-vector</artifactId>
36+
<version>${project.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.apache.commons</groupId>
40+
<artifactId>commons-lang3</artifactId>
41+
<version>3.4</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>commons-cli</groupId>
45+
<artifactId>commons-cli</artifactId>
46+
<version>1.2</version>
47+
</dependency>
48+
</dependencies>
49+
50+
<build>
51+
<plugins>
52+
<plugin>
53+
<artifactId>maven-assembly-plugin</artifactId>
54+
<version>2.6</version>
55+
<configuration>
56+
<descriptorRefs>
57+
<descriptorRef>jar-with-dependencies</descriptorRef>
58+
</descriptorRefs>
59+
</configuration>
60+
<executions>
61+
<execution>
62+
<id>make-assembly</id>
63+
<phase>package</phase>
64+
<goals>
65+
<goal>single</goal>
66+
</goals>
67+
</execution>
68+
</executions>
69+
</plugin>
70+
</plugins>
71+
</build>
72+
73+
</project>
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.arrow.tools;
20+
21+
import java.io.File;
22+
import java.io.FileInputStream;
23+
import java.io.FileOutputStream;
24+
import java.io.IOException;
25+
import java.io.PrintStream;
26+
import java.util.List;
27+
28+
import org.apache.arrow.memory.BufferAllocator;
29+
import org.apache.arrow.memory.RootAllocator;
30+
import org.apache.arrow.vector.VectorLoader;
31+
import org.apache.arrow.vector.VectorSchemaRoot;
32+
import org.apache.arrow.vector.VectorUnloader;
33+
import org.apache.arrow.vector.file.ArrowBlock;
34+
import org.apache.arrow.vector.file.ArrowFooter;
35+
import org.apache.arrow.vector.file.ArrowReader;
36+
import org.apache.arrow.vector.file.ArrowWriter;
37+
import org.apache.arrow.vector.schema.ArrowRecordBatch;
38+
import org.apache.arrow.vector.types.pojo.Schema;
39+
import org.apache.commons.cli.CommandLine;
40+
import org.apache.commons.cli.CommandLineParser;
41+
import org.apache.commons.cli.Options;
42+
import org.apache.commons.cli.ParseException;
43+
import org.apache.commons.cli.PosixParser;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
46+
47+
public class FileRoundtrip {
48+
private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class);
49+
50+
public static void main(String[] args) {
51+
System.exit(new FileRoundtrip(System.out, System.err).run(args));
52+
}
53+
54+
private final Options options;
55+
private final PrintStream out;
56+
private final PrintStream err;
57+
58+
FileRoundtrip(PrintStream out, PrintStream err) {
59+
this.out = out;
60+
this.err = err;
61+
this.options = new Options();
62+
this.options.addOption("i", "in", true, "input file");
63+
this.options.addOption("o", "out", true, "output file");
64+
65+
}
66+
67+
private File validateFile(String type, String fileName) {
68+
if (fileName == null) {
69+
throw new IllegalArgumentException("missing " + type + " file parameter");
70+
}
71+
File f = new File(fileName);
72+
if (!f.exists() || f.isDirectory()) {
73+
throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
74+
}
75+
return f;
76+
}
77+
78+
int run(String[] args) {
79+
try {
80+
CommandLineParser parser = new PosixParser();
81+
CommandLine cmd = parser.parse(options, args, false);
82+
83+
String inFileName = cmd.getOptionValue("in");
84+
String outFileName = cmd.getOptionValue("out");
85+
86+
File inFile = validateFile("input", inFileName);
87+
File outFile = validateFile("output", outFileName);
88+
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
89+
try(
90+
FileInputStream fileInputStream = new FileInputStream(inFile);
91+
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
92+
93+
ArrowFooter footer = arrowReader.readFooter();
94+
Schema schema = footer.getSchema();
95+
LOGGER.debug("Input file size: " + inFile.length());
96+
LOGGER.debug("Found schema: " + schema);
97+
98+
try (
99+
FileOutputStream fileOutputStream = new FileOutputStream(outFile);
100+
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
101+
) {
102+
103+
// initialize vectors
104+
105+
List<ArrowBlock> recordBatches = footer.getRecordBatches();
106+
for (ArrowBlock rbBlock : recordBatches) {
107+
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
108+
VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
109+
110+
VectorLoader vectorLoader = new VectorLoader(root);
111+
vectorLoader.load(inRecordBatch);
112+
113+
VectorUnloader vectorUnloader = new VectorUnloader(root);
114+
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
115+
arrowWriter.writeRecordBatch(recordBatch);
116+
}
117+
}
118+
}
119+
LOGGER.debug("Output file size: " + outFile.length());
120+
}
121+
} catch (ParseException e) {
122+
return fatalError("Invalid parameters", e);
123+
} catch (IOException e) {
124+
return fatalError("Error accessing files", e);
125+
}
126+
return 0;
127+
}
128+
129+
private int fatalError(String message, Throwable e) {
130+
err.println(message);
131+
LOGGER.error(message, e);
132+
return 1;
133+
}
134+
135+
}

0 commit comments

Comments
 (0)