|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, |
| 13 | + * software distributed under the License is distributed on an |
| 14 | + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | + * KIND, either express or implied. See the License for the |
| 16 | + * specific language governing permissions and limitations |
| 17 | + * under the License. |
| 18 | + */ |
| 19 | +package org.apache.arrow.tools; |
| 20 | + |
| 21 | +import static org.junit.Assert.assertEquals; |
| 22 | + |
| 23 | +import java.io.File; |
| 24 | +import java.io.FileInputStream; |
| 25 | +import java.io.FileNotFoundException; |
| 26 | +import java.io.FileOutputStream; |
| 27 | +import java.io.IOException; |
| 28 | +import java.util.List; |
| 29 | + |
| 30 | +import org.apache.arrow.memory.BufferAllocator; |
| 31 | +import org.apache.arrow.memory.RootAllocator; |
| 32 | +import org.apache.arrow.vector.FieldVector; |
| 33 | +import org.apache.arrow.vector.VectorLoader; |
| 34 | +import org.apache.arrow.vector.VectorSchemaRoot; |
| 35 | +import org.apache.arrow.vector.VectorUnloader; |
| 36 | +import org.apache.arrow.vector.complex.MapVector; |
| 37 | +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; |
| 38 | +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; |
| 39 | +import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; |
| 40 | +import org.apache.arrow.vector.complex.writer.BigIntWriter; |
| 41 | +import org.apache.arrow.vector.complex.writer.IntWriter; |
| 42 | +import org.apache.arrow.vector.file.ArrowBlock; |
| 43 | +import org.apache.arrow.vector.file.ArrowFooter; |
| 44 | +import org.apache.arrow.vector.file.ArrowReader; |
| 45 | +import org.apache.arrow.vector.file.ArrowWriter; |
| 46 | +import org.apache.arrow.vector.schema.ArrowRecordBatch; |
| 47 | +import org.apache.arrow.vector.types.pojo.Schema; |
| 48 | +import org.junit.After; |
| 49 | +import org.junit.Assert; |
| 50 | +import org.junit.Before; |
| 51 | +import org.junit.Rule; |
| 52 | +import org.junit.Test; |
| 53 | +import org.junit.rules.TemporaryFolder; |
| 54 | + |
| 55 | +public class TestFileRoundtrip { |
| 56 | + private static final int COUNT = 10; |
| 57 | + |
| 58 | + @Rule |
| 59 | + public TemporaryFolder testFolder = new TemporaryFolder(); |
| 60 | + |
| 61 | + private BufferAllocator allocator; |
| 62 | + |
| 63 | + @Before |
| 64 | + public void init() { |
| 65 | + allocator = new RootAllocator(Integer.MAX_VALUE); |
| 66 | + } |
| 67 | + |
| 68 | + @After |
| 69 | + public void tearDown() { |
| 70 | + allocator.close(); |
| 71 | + } |
| 72 | + |
| 73 | + private void writeData(int count, MapVector parent) { |
| 74 | + ComplexWriter writer = new ComplexWriterImpl("root", parent); |
| 75 | + MapWriter rootWriter = writer.rootAsMap(); |
| 76 | + IntWriter intWriter = rootWriter.integer("int"); |
| 77 | + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); |
| 78 | + for (int i = 0; i < count; i++) { |
| 79 | + intWriter.setPosition(i); |
| 80 | + intWriter.writeInt(i); |
| 81 | + bigIntWriter.setPosition(i); |
| 82 | + bigIntWriter.writeBigInt(i); |
| 83 | + } |
| 84 | + writer.setValueCount(count); |
| 85 | + } |
| 86 | + |
| 87 | + @Test |
| 88 | + public void test() throws Exception { |
| 89 | + File testInFile = testFolder.newFile("testIn.arrow"); |
| 90 | + File testOutFile = testFolder.newFile("testOut.arrow"); |
| 91 | + |
| 92 | + writeInput(testInFile); |
| 93 | + |
| 94 | + String[] args = { "-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()}; |
| 95 | + int result = new FileRoundtrip(System.out, System.err).run(args); |
| 96 | + assertEquals(0, result); |
| 97 | + |
| 98 | + validateOutput(testOutFile); |
| 99 | + } |
| 100 | + |
| 101 | + private void validateOutput(File testOutFile) throws Exception { |
| 102 | + // read |
| 103 | + try ( |
| 104 | + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); |
| 105 | + FileInputStream fileInputStream = new FileInputStream(testOutFile); |
| 106 | + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); |
| 107 | + BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); |
| 108 | + ) { |
| 109 | + ArrowFooter footer = arrowReader.readFooter(); |
| 110 | + Schema schema = footer.getSchema(); |
| 111 | + |
| 112 | + // initialize vectors |
| 113 | + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) { |
| 114 | + VectorLoader vectorLoader = new VectorLoader(root); |
| 115 | + |
| 116 | + List<ArrowBlock> recordBatches = footer.getRecordBatches(); |
| 117 | + for (ArrowBlock rbBlock : recordBatches) { |
| 118 | + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { |
| 119 | + vectorLoader.load(recordBatch); |
| 120 | + } |
| 121 | + validateContent(COUNT, root); |
| 122 | + } |
| 123 | + } |
| 124 | + } |
| 125 | + } |
| 126 | + |
| 127 | + private void validateContent(int count, VectorSchemaRoot root) { |
| 128 | + Assert.assertEquals(count, root.getRowCount()); |
| 129 | + for (int i = 0; i < count; i++) { |
| 130 | + Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i)); |
| 131 | + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i)); |
| 132 | + } |
| 133 | + } |
| 134 | + |
| 135 | + public void writeInput(File testInFile) throws FileNotFoundException, IOException { |
| 136 | + int count = COUNT; |
| 137 | + try ( |
| 138 | + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); |
| 139 | + MapVector parent = new MapVector("parent", vectorAllocator, null)) { |
| 140 | + writeData(count, parent); |
| 141 | + write(parent.getChild("root"), testInFile); |
| 142 | + } |
| 143 | + } |
| 144 | + |
| 145 | + private void write(FieldVector parent, File file) throws FileNotFoundException, IOException { |
| 146 | + Schema schema = new Schema(parent.getField().getChildren()); |
| 147 | + int valueCount = parent.getAccessor().getValueCount(); |
| 148 | + List<FieldVector> fields = parent.getChildrenFromFields(); |
| 149 | + VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields); |
| 150 | + try ( |
| 151 | + FileOutputStream fileOutputStream = new FileOutputStream(file); |
| 152 | + ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); |
| 153 | + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); |
| 154 | + ) { |
| 155 | + arrowWriter.writeRecordBatch(recordBatch); |
| 156 | + } |
| 157 | + } |
| 158 | + |
| 159 | +} |
0 commit comments