Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ public ArrowBuf retain(BufferAllocator target) {
historicalLog.recordEvent("retain(%s)", target.getName());
}
final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
return otherLedger.newArrowBuf(offset, length, null);
ArrowBuf newArrowBuf = otherLedger.newArrowBuf(offset, length, null);
newArrowBuf.readerIndex(this.readerIndex);
newArrowBuf.writerIndex(this.writerIndex);
return newArrowBuf;
}

/**
Expand Down Expand Up @@ -214,6 +217,8 @@ public TransferResult transferOwnership(BufferAllocator target) {

final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
final ArrowBuf newBuf = otherLedger.newArrowBuf(offset, length, null);
newBuf.readerIndex(this.readerIndex);
newBuf.writerIndex(this.writerIndex);
final boolean allocationFit = this.ledger.transferBalance(otherLedger);
return new TransferResult(allocationFit, newBuf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ArrowBuf;
import io.netty.buffer.ArrowBuf.TransferResult;

import org.apache.arrow.memory.AllocationReservation;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.memory.RootAllocator;
import org.junit.Ignore;
import org.junit.Test;

import io.netty.buffer.ArrowBuf;
import io.netty.buffer.ArrowBuf.TransferResult;

public class TestBaseAllocator {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class);

Expand Down Expand Up @@ -134,6 +131,7 @@ public void testAllocator_transferOwnership() throws Exception {
final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
rootAllocator.verify();
TransferResult transferOwnership = arrowBuf1.transferOwnership(childAllocator2);
assertEquiv(arrowBuf1, transferOwnership.buffer);
final boolean allocationFit = transferOwnership.allocationFit;
rootAllocator.verify();
assertTrue(allocationFit);
Expand All @@ -160,6 +158,7 @@ public void testAllocator_shareOwnership() throws Exception {
rootAllocator.verify();
assertNotNull(arrowBuf2);
assertNotEquals(arrowBuf2, arrowBuf1);
assertEquiv(arrowBuf1, arrowBuf2);

// release original buffer (thus transferring ownership to allocator 2. (should leave allocator 1 in empty state)
arrowBuf1.release();
Expand All @@ -172,6 +171,7 @@ public void testAllocator_shareOwnership() throws Exception {
assertNotNull(arrowBuf3);
assertNotEquals(arrowBuf3, arrowBuf1);
assertNotEquals(arrowBuf3, arrowBuf2);
assertEquiv(arrowBuf1, arrowBuf3);
rootAllocator.verify();

arrowBuf2.release();
Expand Down Expand Up @@ -452,8 +452,10 @@ public void testAllocator_transferSliced() throws Exception {
rootAllocator.verify();

TransferResult result1 = arrowBuf2s.transferOwnership(childAllocator1);
assertEquiv(arrowBuf2s, result1.buffer);
rootAllocator.verify();
TransferResult result2 = arrowBuf1s.transferOwnership(childAllocator2);
assertEquiv(arrowBuf1s, result2.buffer);
rootAllocator.verify();

result1.buffer.release();
Expand Down Expand Up @@ -482,7 +484,9 @@ public void testAllocator_shareSliced() throws Exception {
rootAllocator.verify();

final ArrowBuf arrowBuf2s1 = arrowBuf2s.retain(childAllocator1);
assertEquiv(arrowBuf2s, arrowBuf2s1);
final ArrowBuf arrowBuf1s2 = arrowBuf1s.retain(childAllocator2);
assertEquiv(arrowBuf1s, arrowBuf1s2);
rootAllocator.verify();

arrowBuf1s.release(); // releases arrowBuf1
Expand Down Expand Up @@ -512,11 +516,13 @@ public void testAllocator_transferShared() throws Exception {
rootAllocator.verify();
assertNotNull(arrowBuf2);
assertNotEquals(arrowBuf2, arrowBuf1);
assertEquiv(arrowBuf1, arrowBuf2);

TransferResult result = arrowBuf1.transferOwnership(childAllocator3);
allocationFit = result.allocationFit;
final ArrowBuf arrowBuf3 = result.buffer;
assertTrue(allocationFit);
assertEquiv(arrowBuf1, arrowBuf3);
rootAllocator.verify();

// Since childAllocator3 now has childAllocator1's buffer, 1, can close
Expand All @@ -533,6 +539,7 @@ public void testAllocator_transferShared() throws Exception {
allocationFit = result.allocationFit;
final ArrowBuf arrowBuf4 = result2.buffer;
assertTrue(allocationFit);
assertEquiv(arrowBuf3, arrowBuf4);
rootAllocator.verify();

arrowBuf3.release();
Expand Down Expand Up @@ -645,4 +652,9 @@ public void multiple() throws Exception {

}
}

public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) {
assertEquals(origBuf.readerIndex(), newBuf.readerIndex());
assertEquals(origBuf.writerIndex(), newBuf.writerIndex());
}
}
1 change: 1 addition & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -467,5 +467,6 @@
<module>format</module>
<module>memory</module>
<module>vector</module>
<module>tools</module>
</modules>
</project>
73 changes: 73 additions & 0 deletions java/tools/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-java-root</artifactId>
<version>0.1.1-SNAPSHOT</version>
</parent>
<artifactId>arrow-tools</artifactId>
<name>Arrow Tools</name>

<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-format</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
135 changes: 135 additions & 0 deletions java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.arrow.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.file.ArrowReader;
import org.apache.arrow.vector.file.ArrowWriter;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileRoundtrip {
private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class);

public static void main(String[] args) {
System.exit(new FileRoundtrip(System.out, System.err).run(args));
}

private final Options options;
private final PrintStream out;
private final PrintStream err;

FileRoundtrip(PrintStream out, PrintStream err) {
this.out = out;
this.err = err;
this.options = new Options();
this.options.addOption("i", "in", true, "input file");
this.options.addOption("o", "out", true, "output file");

}

private File validateFile(String type, String fileName) {
if (fileName == null) {
throw new IllegalArgumentException("missing " + type + " file parameter");
}
File f = new File(fileName);
if (!f.exists() || f.isDirectory()) {
throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
}
return f;
}

int run(String[] args) {
try {
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args, false);

String inFileName = cmd.getOptionValue("in");
String outFileName = cmd.getOptionValue("out");

File inFile = validateFile("input", inFileName);
File outFile = validateFile("output", outFileName);
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
try(
FileInputStream fileInputStream = new FileInputStream(inFile);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {

ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
LOGGER.debug("Input file size: " + inFile.length());
LOGGER.debug("Found schema: " + schema);

try (
FileOutputStream fileOutputStream = new FileOutputStream(outFile);
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
) {

// initialize vectors

List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {

VectorLoader vectorLoader = new VectorLoader(root);
vectorLoader.load(inRecordBatch);

VectorUnloader vectorUnloader = new VectorUnloader(root);
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
arrowWriter.writeRecordBatch(recordBatch);
}
}
}
LOGGER.debug("Output file size: " + outFile.length());
}
} catch (ParseException e) {
return fatalError("Invalid parameters", e);
} catch (IOException e) {
return fatalError("Error accessing files", e);
}
return 0;
}

private int fatalError(String message, Throwable e) {
err.println(message);
LOGGER.error(message, e);
return 1;
}

}
Loading