Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 262 additions & 0 deletions java/tools/src/main/java/org/apache/arrow/tools/Integration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.arrow.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.file.ArrowReader;
import org.apache.arrow.vector.file.ArrowWriter;
import org.apache.arrow.vector.file.json.JsonFileReader;
import org.apache.arrow.vector.file.json.JsonFileWriter;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Objects;

public class Integration {
private static final Logger LOGGER = LoggerFactory.getLogger(Integration.class);

public static void main(String[] args) {
try {
new Integration().run(args);
} catch (ParseException e) {
fatalError("Invalid parameters", e);
} catch (IOException e) {
fatalError("Error accessing files", e);
} catch (RuntimeException e) {
fatalError("Incompatible files", e);
}
}

private final Options options;

enum Command {
ARROW_TO_JSON(true, false) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
try(
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
LOGGER.debug("Input file size: " + arrowFile.length());
LOGGER.debug("Found schema: " + schema);
try (JsonFileWriter writer = new JsonFileWriter(jsonFile);) {
writer.start(schema);
List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
VectorLoader vectorLoader = new VectorLoader(root);
vectorLoader.load(inRecordBatch);
writer.write(root);
}
}
}
LOGGER.debug("Output file size: " + jsonFile.length());
}
}
},
JSON_TO_ARROW(false, true) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
try (
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader reader = new JsonFileReader(jsonFile, allocator);
) {
Schema schema = reader.start();
LOGGER.debug("Input file size: " + jsonFile.length());
LOGGER.debug("Found schema: " + schema);
try (
FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
) {

// initialize vectors
VectorSchemaRoot root;
while ((root = reader.read()) != null) {
VectorUnloader vectorUnloader = new VectorUnloader(root);
try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();) {
arrowWriter.writeRecordBatch(recordBatch);
}
root.close();
}
}
LOGGER.debug("Output file size: " + arrowFile.length());
}
}
},
VALIDATE(true, true) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
try (
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);
) {
Schema jsonSchema = jsonReader.start();
ArrowFooter footer = arrowReader.readFooter();
Schema arrowSchema = footer.getSchema();
LOGGER.debug("Arrow Input file size: " + arrowFile.length());
LOGGER.debug("ARROW schema: " + arrowSchema);
LOGGER.debug("JSON Input file size: " + jsonFile.length());
LOGGER.debug("JSON schema: " + jsonSchema);
compareSchemas(jsonSchema, arrowSchema);

List<ArrowBlock> recordBatches = footer.getRecordBatches();
Iterator<ArrowBlock> iterator = recordBatches.iterator();
VectorSchemaRoot jsonRoot;
while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) {
ArrowBlock rbBlock = iterator.next();
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
VectorSchemaRoot arrowRoot = new VectorSchemaRoot(arrowSchema, allocator);) {
VectorLoader vectorLoader = new VectorLoader(arrowRoot);
vectorLoader.load(inRecordBatch);
// TODO: compare
compare(arrowRoot, jsonRoot);
}
jsonRoot.close();
}
boolean hasMoreJSON = jsonRoot != null;
boolean hasMoreArrow = iterator.hasNext();
if (hasMoreJSON || hasMoreArrow) {
throw new IllegalArgumentException("Unexpected RecordBatches. J:" + hasMoreJSON + " A:" + hasMoreArrow);
}
}
}
};

public final boolean arrowExists;
public final boolean jsonExists;

Command(boolean arrowExists, boolean jsonExists) {
this.arrowExists = arrowExists;
this.jsonExists = jsonExists;
}

abstract public void execute(File arrowFile, File jsonFile) throws IOException;

}

Integration() {
this.options = new Options();
this.options.addOption("a", "arrow", true, "arrow file");
this.options.addOption("j", "json", true, "json file");
this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command.values()));
}

private File validateFile(String type, String fileName, boolean shouldExist) {
if (fileName == null) {
throw new IllegalArgumentException("missing " + type + " file parameter");
}
File f = new File(fileName);
if (shouldExist && (!f.exists() || f.isDirectory())) {
throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
}
if (!shouldExist && f.exists()) {
throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath());
}
return f;
}

void run(String[] args) throws ParseException, IOException {
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args, false);


Command command = toCommand(cmd.getOptionValue("command"));
File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists);
File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists);
command.execute(arrowFile, jsonFile);
}

private Command toCommand(String commandName) {
try {
return Command.valueOf(commandName);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of " + Arrays.toString(Command.values()));
}
}

private static void fatalError(String message, Throwable e) {
System.err.println(message);
LOGGER.error(message, e);
System.exit(1);
}


private static void compare(VectorSchemaRoot arrowRoot, VectorSchemaRoot jsonRoot) {
compareSchemas(jsonRoot.getSchema(), arrowRoot.getSchema());
if (arrowRoot.getRowCount() != jsonRoot.getRowCount()) {
throw new IllegalArgumentException("Different row count:\n" + arrowRoot.getRowCount() + "\n" + jsonRoot.getRowCount());
}
List<FieldVector> arrowVectors = arrowRoot.getFieldVectors();
List<FieldVector> jsonVectors = jsonRoot.getFieldVectors();
if (arrowVectors.size() != jsonVectors.size()) {
throw new IllegalArgumentException("Different column count:\n" + arrowVectors.size() + "\n" + jsonVectors.size());
}
for (int i = 0; i < arrowVectors.size(); i++) {
Field field = arrowRoot.getSchema().getFields().get(i);
FieldVector arrowVector = arrowVectors.get(i);
FieldVector jsonVector = jsonVectors.get(i);
int valueCount = arrowVector.getAccessor().getValueCount();
if (valueCount != jsonVector.getAccessor().getValueCount()) {
throw new IllegalArgumentException("Different value count for field " + field + " : " + valueCount + " != " + jsonVector.getAccessor().getValueCount());
}
for (int j = 0; j < valueCount; j++) {
Object arrow = arrowVector.getAccessor().getObject(j);
Object json = jsonVector.getAccessor().getObject(j);
if (!Objects.equal(arrow, json)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works for nested types and nulls?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.
Objects.equals takes care of null.
getObject(index) materializes nested types and lists.

throw new IllegalArgumentException(
"Different values in column:\n" + field + " at index " + j + ": " + arrow + " != " + json);
}
}
}
}

private static void compareSchemas(Schema jsonSchema, Schema arrowSchema) {
if (!arrowSchema.equals(jsonSchema)) {
throw new IllegalArgumentException("Different schemas:\n" + arrowSchema + "\n" + jsonSchema);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.arrow.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
import org.apache.arrow.vector.complex.writer.BigIntWriter;
import org.apache.arrow.vector.complex.writer.IntWriter;
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.file.ArrowReader;
import org.apache.arrow.vector.file.ArrowWriter;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;

public class ArrowFileTestFixtures {
static final int COUNT = 10;

static void writeData(int count, MapVector parent) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think of it, many of these test cases could use some null values

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I was busy this week, but I will add more tests as suggested.

ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter rootWriter = writer.rootAsMap();
IntWriter intWriter = rootWriter.integer("int");
BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt");
for (int i = 0; i < count; i++) {
intWriter.setPosition(i);
intWriter.writeInt(i);
bigIntWriter.setPosition(i);
bigIntWriter.writeBigInt(i);
}
writer.setValueCount(count);
}

static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception {
// read
try (
BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(testOutFile);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
) {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();

// initialize vectors
try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) {
VectorLoader vectorLoader = new VectorLoader(root);

List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
vectorLoader.load(recordBatch);
}
validateContent(COUNT, root);
}
}
}
}

static void validateContent(int count, VectorSchemaRoot root) {
Assert.assertEquals(count, root.getRowCount());
for (int i = 0; i < count; i++) {
Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i));
Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i));
}
}

static void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
Schema schema = new Schema(parent.getField().getChildren());
int valueCount = parent.getAccessor().getValueCount();
List<FieldVector> fields = parent.getChildrenFromFields();
VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields);
try (
FileOutputStream fileOutputStream = new FileOutputStream(file);
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
) {
arrowWriter.writeRecordBatch(recordBatch);
}
}


static void writeInput(File testInFile, BufferAllocator allocator) throws FileNotFoundException, IOException {
int count = ArrowFileTestFixtures.COUNT;
try (
BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
MapVector parent = new MapVector("parent", vectorAllocator, null)) {
writeData(count, parent);
write(parent.getChild("root"), testInFile);
}
}
}
Loading