|
18 | 18 | */ |
19 | 19 | package org.apache.arrow.tools; |
20 | 20 |
|
| 21 | +import java.io.File; |
| 22 | +import java.io.FileInputStream; |
| 23 | +import java.io.FileOutputStream; |
| 24 | +import java.io.IOException; |
| 25 | +import java.io.PrintStream; |
| 26 | +import java.util.List; |
| 27 | + |
21 | 28 | import org.apache.arrow.memory.BufferAllocator; |
22 | 29 | import org.apache.arrow.memory.RootAllocator; |
23 | 30 | import org.apache.arrow.vector.VectorLoader; |
24 | 31 | import org.apache.arrow.vector.VectorUnloader; |
| 32 | +import org.apache.arrow.vector.complex.MapVector; |
25 | 33 | import org.apache.arrow.vector.complex.NullableMapVector; |
26 | 34 | import org.apache.arrow.vector.file.ArrowBlock; |
27 | 35 | import org.apache.arrow.vector.file.ArrowFooter; |
|
32 | 40 | import org.apache.arrow.vector.types.pojo.Schema; |
33 | 41 | import org.apache.commons.cli.CommandLine; |
34 | 42 | import org.apache.commons.cli.CommandLineParser; |
35 | | -import org.apache.commons.cli.Option; |
36 | | -import org.apache.commons.cli.OptionBuilder; |
37 | 43 | import org.apache.commons.cli.Options; |
| 44 | +import org.apache.commons.cli.ParseException; |
38 | 45 | import org.apache.commons.cli.PosixParser; |
39 | 46 | import org.slf4j.Logger; |
40 | 47 | import org.slf4j.LoggerFactory; |
41 | 48 |
|
42 | | -import java.io.File; |
43 | | -import java.io.FileInputStream; |
44 | | -import java.io.FileOutputStream; |
45 | | -import java.util.Arrays; |
46 | | -import java.util.List; |
47 | | - |
48 | 49 | public class FileRoundtrip { |
49 | 50 | private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class); |
50 | | - public static final Options OPTIONS; |
51 | | - |
52 | | - static { |
53 | | - OPTIONS = new Options(); |
54 | | - } |
55 | 51 |
|
56 | 52 | public static void main(String[] args) { |
57 | | - try { |
58 | | - String[] cargs = Arrays.copyOfRange(args, 1, args.length); |
59 | | - |
60 | | - CommandLineParser parser = new PosixParser(); |
61 | | - CommandLine cmd = parser.parse(OPTIONS, cargs, false); |
62 | | - |
63 | | - String[] parsed_args = cmd.getArgs(); |
64 | | - |
65 | | - String inFileName = parsed_args[0]; |
66 | | - String outFileName = parsed_args[1]; |
67 | | - |
68 | | - BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); |
69 | | - |
70 | | - File inFile = new File(inFileName); |
71 | | - FileInputStream fileInputStream = new FileInputStream(inFile); |
72 | | - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator); |
73 | | - |
74 | | - ArrowFooter footer = arrowReader.readFooter(); |
75 | | - Schema schema = footer.getSchema(); |
76 | | - LOGGER.debug("Found schema: " + schema); |
| 53 | + System.exit(new FileRoundtrip(System.out, System.err).run(args)); |
| 54 | + } |
77 | 55 |
|
78 | | - File outFile = new File(outFileName); |
79 | | - FileOutputStream fileOutputStream = new FileOutputStream(outFile); |
80 | | - ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); |
| 56 | + private final Options options; |
| 57 | + private final PrintStream out; |
| 58 | + private final PrintStream err; |
81 | 59 |
|
82 | | - // initialize vectors |
| 60 | + FileRoundtrip(PrintStream out, PrintStream err) { |
| 61 | + this.out = out; |
| 62 | + this.err = err; |
| 63 | + this.options = new Options(); |
| 64 | + this.options.addOption("i", "in", true, "input file"); |
| 65 | + this.options.addOption("o", "out", true, "output file"); |
83 | 66 |
|
84 | | - List<ArrowBlock> recordBatches = footer.getRecordBatches(); |
85 | | - for (ArrowBlock rbBlock : recordBatches) { |
86 | | - ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); |
| 67 | + } |
87 | 68 |
|
88 | | - NullableMapVector inParent = new NullableMapVector("parent", allocator, null); |
89 | | - NullableMapVector root = inParent.addOrGet("root", Types.MinorType.MAP, NullableMapVector.class); |
90 | | - VectorLoader vectorLoader = new VectorLoader(schema, root); |
91 | | - vectorLoader.load(inRecordBatch); |
| 69 | + private File validateFile(String type, String fileName) { |
| 70 | + if (fileName == null) { |
| 71 | + throw new IllegalArgumentException("missing " + type + " file parameter"); |
| 72 | + } |
| 73 | + File f = new File(fileName); |
| 74 | + if (!f.exists() || f.isDirectory()) { |
| 75 | + throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); |
| 76 | + } |
| 77 | + return f; |
| 78 | + } |
92 | 79 |
|
93 | | - NullableMapVector outParent = new NullableMapVector("parent", allocator, null); |
94 | | - VectorUnloader vectorUnloader = new VectorUnloader(outParent); |
95 | | - ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); |
96 | | - arrowWriter.writeRecordBatch(recordBatch); |
| 80 | + int run(String[] args) { |
| 81 | + try { |
| 82 | + CommandLineParser parser = new PosixParser(); |
| 83 | + CommandLine cmd = parser.parse(options, args, false); |
| 84 | + |
| 85 | + String inFileName = cmd.getOptionValue("in"); |
| 86 | + String outFileName = cmd.getOptionValue("out"); |
| 87 | + |
| 88 | + File inFile = validateFile("input", inFileName); |
| 89 | + File outFile = validateFile("output", outFileName); |
| 90 | + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close |
| 91 | + try( |
| 92 | + FileInputStream fileInputStream = new FileInputStream(inFile); |
| 93 | + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) { |
| 94 | + |
| 95 | + ArrowFooter footer = arrowReader.readFooter(); |
| 96 | + Schema schema = footer.getSchema(); |
| 97 | + LOGGER.debug("Input file size: " + inFile.length()); |
| 98 | + LOGGER.debug("Found schema: " + schema); |
| 99 | + |
| 100 | + try ( |
| 101 | + FileOutputStream fileOutputStream = new FileOutputStream(outFile); |
| 102 | + ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); |
| 103 | + ) { |
| 104 | + |
| 105 | + // initialize vectors |
| 106 | + |
| 107 | + List<ArrowBlock> recordBatches = footer.getRecordBatches(); |
| 108 | + for (ArrowBlock rbBlock : recordBatches) { |
| 109 | + try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); |
| 110 | + MapVector parent = new MapVector("parent", allocator, null);) { |
| 111 | + |
| 112 | + NullableMapVector root = parent.addOrGet("root", Types.MinorType.MAP, NullableMapVector.class); |
| 113 | + VectorLoader vectorLoader = new VectorLoader(schema, root); |
| 114 | + vectorLoader.load(inRecordBatch); |
| 115 | + |
| 116 | + // NullableMapVector outParent = new NullableMapVector("parent", allocator, null); |
| 117 | + VectorUnloader vectorUnloader = new VectorUnloader(root); |
| 118 | + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); |
| 119 | + arrowWriter.writeRecordBatch(recordBatch); |
| 120 | + } |
| 121 | + } |
| 122 | + } |
| 123 | + LOGGER.debug("Output file size: " + outFile.length()); |
97 | 124 | } |
98 | | - } catch (Throwable th) { |
99 | | - System.err.println(th.getMessage()); |
| 125 | + } catch (ParseException e) { |
| 126 | + return fatalError("Invalid parameters", e); |
| 127 | + } catch (IOException e) { |
| 128 | + return fatalError("Error accessing files", e); |
100 | 129 | } |
| 130 | + return 0; |
101 | 131 | } |
| 132 | + |
| 133 | + private int fatalError(String message, Throwable e) { |
| 134 | + err.println(message); |
| 135 | + LOGGER.error(message, e); |
| 136 | + return 1; |
| 137 | + } |
| 138 | + |
102 | 139 | } |
0 commit comments